From 0d0a8bd03323594e57bff70fccd9b2f5109c23df Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Wed, 20 Sep 2023 15:32:00 +0800 Subject: [PATCH] refactor(test): refactor bulk load function test --- src/block_service/local/local_service.cpp | 12 +- src/block_service/local/local_service.h | 10 + src/block_service/test/local_service_test.cpp | 1 + src/meta/meta_bulk_load_service.h | 4 + .../function_test/bulk_load/CMakeLists.txt | 3 +- .../bulk_load/test_bulk_load.cpp | 288 ++++++++++-------- 6 files changed, 176 insertions(+), 142 deletions(-) diff --git a/src/block_service/local/local_service.cpp b/src/block_service/local/local_service.cpp index 5eb4b3fba4..04cf1cc3d8 100644 --- a/src/block_service/local/local_service.cpp +++ b/src/block_service/local/local_service.cpp @@ -16,17 +16,16 @@ // under the License. #include -#include #include -#include #include +#include #include #include #include #include #include "local_service.h" -#include "nlohmann/detail/macro_scope.hpp" +#include "nlohmann/json.hpp" #include "nlohmann/json_fwd.hpp" #include "runtime/task/async_calls.h" #include "utils/autoref_ptr.h" @@ -52,13 +51,6 @@ namespace block_service { DEFINE_TASK_CODE(LPC_LOCAL_SERVICE_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_BLOCK_SERVICE) -struct file_metadata -{ - uint64_t size; - std::string md5; -}; -NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5) - bool file_metadata_from_json(std::ifstream &fin, file_metadata &fmeta) noexcept { std::string data; diff --git a/src/block_service/local/local_service.h b/src/block_service/local/local_service.h index 9816734cf0..a32eebd982 100644 --- a/src/block_service/local/local_service.h +++ b/src/block_service/local/local_service.h @@ -17,6 +17,9 @@ #pragma once +#include +#include +#include #include #include #include @@ -32,6 +35,13 @@ class task_tracker; namespace dist { namespace block_service { +struct file_metadata +{ + int64_t size = 0; + std::string md5; +}; +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5) + class local_service : public block_filesystem { public: diff --git a/src/block_service/test/local_service_test.cpp b/src/block_service/test/local_service_test.cpp index e355a1b281..72a2012370 100644 --- a/src/block_service/test/local_service_test.cpp +++ b/src/block_service/test/local_service_test.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include "block_service/local/local_service.h" diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index ba151757a8..c411d87f44 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -97,6 +97,10 @@ struct bulk_load_info int32_t app_id; std::string app_name; int32_t partition_count; + bulk_load_info(int32_t id = 0, const std::string &name = "", int32_t pcount = 0) + : app_id(id), app_name(name), partition_count(pcount) + { + } DEFINE_JSON_SERIALIZATION(app_id, app_name, partition_count) }; diff --git a/src/test/function_test/bulk_load/CMakeLists.txt b/src/test/function_test/bulk_load/CMakeLists.txt index 92a677ddf8..a6eac08515 100644 --- a/src/test/function_test/bulk_load/CMakeLists.txt +++ b/src/test/function_test/bulk_load/CMakeLists.txt @@ -37,7 +37,8 @@ set(MY_PROJ_LIBS gssapi_krb5 krb5 function_test_utils - ) + rocksdb + test_utils) set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) diff --git a/src/test/function_test/bulk_load/test_bulk_load.cpp b/src/test/function_test/bulk_load/test_bulk_load.cpp index f350c69453..845ae6539e 100644 --- a/src/test/function_test/bulk_load/test_bulk_load.cpp +++ b/src/test/function_test/bulk_load/test_bulk_load.cpp @@ -15,9 +15,15 @@ // specific language governing permissions and limitations // under the License. +#include // IWYU pragma: no_include // IWYU pragma: no_include #include +#include +#include +#include +#include +#include #include #include #include @@ -26,20 +32,24 @@ #include #include #include -#include #include "base/pegasus_const.h" +#include "block_service/local/local_service.h" #include "bulk_load_types.h" +#include "client/partition_resolver.h" #include "client/replication_ddl_client.h" +#include "common/json_helper.h" #include "include/pegasus/client.h" #include "include/pegasus/error.h" +#include "meta/meta_bulk_load_service.h" #include "meta_admin_types.h" -#include "metadata_types.h" #include "test/function_test/utils/test_util.h" +#include "utils/blob.h" +#include "utils/enum_helper.h" #include "utils/error_code.h" #include "utils/errors.h" #include "utils/filesystem.h" -#include "utils/utils.h" +#include "utils/test_macros.h" using namespace ::dsn; using namespace ::dsn::replication; @@ -55,12 +65,12 @@ using std::string; /// - `bulk_load_root` sub-directory stores right data /// - Please do not rename any files or directories under this folder /// -/// The app who is executing bulk load: -/// - app_name is `temp`, app_id is 2, partition_count is 8 +/// The app to test bulk load functionality: +/// - partition count should be 8 /// /// Data: -/// hashkey: hashi sortkey: sorti value: newValue i=[0, 1000] -/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000] +/// hashkey: hash${i} sortkey: sort${i} value: newValue i=[0, 1000] +/// hashkey: hashkey${j} sortkey: sortkey${j} value: newValue j=[0, 1000] /// class bulk_load_test : public test_util { @@ -68,80 +78,101 @@ class bulk_load_test : public test_util bulk_load_test() : test_util(map({{"rocksdb.allow_ingest_behind", "true"}})) { TRICKY_CODE_TO_AVOID_LINK_ERROR; - bulk_load_local_root_ = - utils::filesystem::path_combine("onebox/block_service/local_service/", LOCAL_ROOT); + BULK_LOAD_LOCAL_APP_ROOT = + fmt::format("{}/{}/{}", LOCAL_BULK_LOAD_ROOT, CLUSTER, app_name_); } void SetUp() override { test_util::SetUp(); - ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files()); + NO_FATALS(copy_bulk_load_files()); } void TearDown() override { ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0)); - ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm -rf onebox/block_service")); + NO_FATALS(run_cmd_from_project_root("rm -rf " + LOCAL_BULK_LOAD_ROOT)); } - void copy_bulk_load_files() + // Generate the 'bulk_load_info' file according to 'bli' to path 'bulk_load_info_path'. + void generate_bulk_load_info(const bulk_load_info &bli, const std::string &bulk_load_info_path) { - ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("mkdir -p onebox/block_service")); - ASSERT_NO_FATAL_FAILURE( - run_cmd_from_project_root("mkdir -p onebox/block_service/local_service")); - ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root( - "cp -r src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" + - LOCAL_ROOT + " onebox/block_service/local_service")); - string cmd = "echo '{\"app_id\":" + std::to_string(app_id_) + - ",\"app_name\":\"temp\",\"partition_count\":8}' > " - "onebox/block_service/local_service/bulk_load_root/cluster/temp/" - "bulk_load_info"; - ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd)); + blob value = dsn::json::json_forwarder::encode(bli); + auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(), + rocksdb::Slice(value.data(), value.length()), + bulk_load_info_path, + /* should_sync */ true); + ASSERT_TRUE(s.ok()) << s.ToString(); } - error_code start_bulk_load(bool ingest_behind = false) + // Generate the '.bulk_load_info.meta' file according to the 'bulk_load_info' file + // in path 'bulk_load_info_path'. + void generate_bulk_load_info_meta(const std::string &bulk_load_info_path) { - auto err_resp = - ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER, LOCAL_ROOT, ingest_behind); - return err_resp.get_value().err; + dist::block_service::file_metadata fm; + ASSERT_TRUE(utils::filesystem::file_size(bulk_load_info_path, fm.size)); + ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path, fm.md5)); + std::string value = nlohmann::json(fm).dump(); + string bulk_load_info_meta_path = + fmt::format("{}/{}/{}/.bulk_load_info.meta", LOCAL_BULK_LOAD_ROOT, CLUSTER, app_name_); + auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(), + rocksdb::Slice(value), + bulk_load_info_meta_path, + /* should_sync */ true); + ASSERT_TRUE(s.ok()) << s.ToString(); } - void remove_file(const string &file_path) + void copy_bulk_load_files() + { + // TODO(yingchun): remove the 'mock_bulk_load_info' file, because we can generate it. + // Prepare bulk load files. + // The source data has 8 partitions. + ASSERT_EQ(8, partition_count_); + NO_FATALS(run_cmd_from_project_root("mkdir -p " + LOCAL_BULK_LOAD_ROOT)); + NO_FATALS(run_cmd_from_project_root( + fmt::format("cp -r {}/{} {}", SOURCE_FILES_ROOT, BULK_LOAD, LOCAL_SERVICE_ROOT))); + + // Generate 'bulk_load_info'. + string bulk_load_info_path = + fmt::format("{}/{}/{}/bulk_load_info", LOCAL_BULK_LOAD_ROOT, CLUSTER, app_name_); + NO_FATALS(generate_bulk_load_info(bulk_load_info(app_id_, app_name_, partition_count_), + bulk_load_info_path)); + + // Generate '.bulk_load_info.meta'. + NO_FATALS(generate_bulk_load_info_meta(bulk_load_info_path)); + } + + error_code start_bulk_load(bool ingest_behind = false) { - ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm " + file_path)); + return ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER, BULK_LOAD, ingest_behind) + .get_value() + .err; } - void replace_bulk_load_info() + void remove_file(const string &file_path) { - string cmd = "cp -R " - "src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" - "mock_bulk_load_info/. " + - bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ + "/"; - ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd)); + NO_FATALS(run_cmd_from_project_root("rm " + file_path)); } void update_allow_ingest_behind(const string &allow_ingest_behind) { - // update app envs - std::vector keys; - keys.emplace_back(ROCKSDB_ALLOW_INGEST_BEHIND); - std::vector values; - values.emplace_back(allow_ingest_behind); - ASSERT_EQ(ERR_OK, ddl_client_->set_app_envs(app_name_, keys, values).get_value().err); + const auto ret = ddl_client_->set_app_envs( + app_name_, {ROCKSDB_ALLOW_INGEST_BEHIND}, {allow_ingest_behind}); + ASSERT_EQ(ERR_OK, ret.get_value().err); std::cout << "sleep 31s to wait app_envs update" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(31)); } - bulk_load_status::type wait_bulk_load_finish(int64_t seconds) + bulk_load_status::type wait_bulk_load_finish(int64_t remain_seconds) { int64_t sleep_time = 5; error_code err = ERR_OK; bulk_load_status::type last_status = bulk_load_status::BLS_INVALID; // when bulk load end, err will be ERR_INVALID_STATE - while (seconds > 0 && err == ERR_OK) { - sleep_time = sleep_time > seconds ? seconds : sleep_time; - seconds -= sleep_time; + while (remain_seconds > 0 && err == ERR_OK) { + sleep_time = std::min(sleep_time, remain_seconds); + remain_seconds -= sleep_time; std::cout << "sleep " << sleep_time << "s to query bulk status" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); @@ -156,13 +187,12 @@ class bulk_load_test : public test_util void verify_bulk_load_data() { - ASSERT_NO_FATAL_FAILURE(verify_data("hashkey", "sortkey")); - ASSERT_NO_FATAL_FAILURE(verify_data(HASHKEY_PREFIX, SORTKEY_PREFIX)); + NO_FATALS(verify_data("hashkey", "sortkey")); + NO_FATALS(verify_data(HASHKEY_PREFIX, SORTKEY_PREFIX)); } void verify_data(const string &hashkey_prefix, const string &sortkey_prefix) { - const string &expected_value = VALUE; for (int i = 0; i < COUNT; ++i) { string hash_key = hashkey_prefix + std::to_string(i); for (int j = 0; j < COUNT; ++j) { @@ -170,7 +200,7 @@ class bulk_load_test : public test_util string act_value; ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, act_value)) << hash_key << "," << sort_key; - ASSERT_EQ(expected_value, act_value) << hash_key << "," << sort_key; + ASSERT_EQ(VALUE, act_value) << hash_key << "," << sort_key; } } } @@ -210,108 +240,104 @@ class bulk_load_test : public test_util } } -protected: - string bulk_load_local_root_; + void check_bulk_load(bool ingest_behind, + const std::string &value_before_bulk_load, + const std::string &value_after_bulk_load) + { + // write old data + NO_FATALS(operate_data(operation::SET, value_before_bulk_load, 10)); + NO_FATALS(operate_data(operation::GET, value_before_bulk_load, 10)); + + ASSERT_EQ(ERR_OK, start_bulk_load(ingest_behind)); + ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300)); + if (!ingest_behind) { + std::cout << "Start to verify data..." << std::endl; + NO_FATALS(verify_bulk_load_data()); + } + // values have been overwritten by bulk_load_data + const std::string &expect_value = ingest_behind ? value_before_bulk_load : VALUE; + NO_FATALS(operate_data(operation::GET, VALUE, 10)); + if (ingest_behind) { + NO_FATALS(verify_data("hashkey", "sortkey")); + } + + // write new data succeed after bulk load + NO_FATALS(operate_data(operation::SET, value_after_bulk_load, 20)); + NO_FATALS(operate_data(operation::GET, value_after_bulk_load, 20)); + + // delete data succeed after bulk load + NO_FATALS(operate_data(operation::DEL, "", 15)); + NO_FATALS(operate_data(operation::NO_VALUE, "", 15)); + } - const string LOCAL_ROOT = "bulk_load_root"; +protected: + string BULK_LOAD_LOCAL_APP_ROOT; + const string SOURCE_FILES_ROOT = + "src/test/function_test/bulk_load_test/pegasus-bulk-load-function-test-files"; + const string LOCAL_SERVICE_ROOT = "onebox/block_service/local_service"; + const string LOCAL_BULK_LOAD_ROOT = "onebox/block_service/local_service/bulk_load_root"; + const string BULK_LOAD = "bulk_load_root"; const string CLUSTER = "cluster"; const string PROVIDER = "local_service"; - const string HASHKEY_PREFIX = "hash"; const string SORTKEY_PREFIX = "sort"; const string VALUE = "newValue"; - const int32_t COUNT = 1000; + const int32_t COUNT = 100; }; -/// -/// case1: lack of `bulk_load_info` file -/// case2: `bulk_load_info` file inconsistent with app_info -/// -TEST_F(bulk_load_test, bulk_load_test_failed) +// Test bulk load failed because the 'bulk_load_info' file is missing +TEST_F(bulk_load_test, bulk_load_test_missing_bulk_load_info) { - // bulk load failed because `bulk_load_info` file is missing - ASSERT_NO_FATAL_FAILURE( - remove_file(bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ + "/bulk_load_info")); + NO_FATALS(remove_file(BULK_LOAD_LOCAL_APP_ROOT + "/bulk_load_info")); ASSERT_EQ(ERR_OBJECT_NOT_FOUND, start_bulk_load()); +} - // bulk load failed because `bulk_load_info` file inconsistent with current app_info - ASSERT_NO_FATAL_FAILURE(replace_bulk_load_info()); - ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load()); +// Test bulk load failed because the 'bulk_load_info' file is inconsistent with the actual app info. +TEST_F(bulk_load_test, bulk_load_test_inconsistent_bulk_load_info) +{ + // Only 'app_id' and 'partition_count' will be checked in Pegasus server, so just inject these + // kind of inconsistencies. + bulk_load_info tests[] = {{app_id_ + 1, app_name_, partition_count_}, + {app_id_, app_name_, partition_count_ * 2}}; + for (const auto &test : tests) { + // Generate inconsistent 'bulk_load_info'. + string bulk_load_info_path = + fmt::format("{}/{}/{}/bulk_load_info", LOCAL_BULK_LOAD_ROOT, CLUSTER, app_name_); + NO_FATALS(generate_bulk_load_info(test, bulk_load_info_path)); + + // Generate '.bulk_load_info.meta'. + NO_FATALS(generate_bulk_load_info_meta(bulk_load_info_path)); + + ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load()) << test.app_id << "," << test.app_name + << "," << test.partition_count; + } } -/// -/// case1: lack of `bulk_load_metadata` file -/// case2: bulk load succeed with data verfied -/// case3: bulk load data consistent: -/// - old data will be overrided by bulk load data -/// - get/set/del succeed after bulk load -/// -TEST_F(bulk_load_test, bulk_load_tests) +// Test bulk load failed because partition[0]'s 'bulk_load_metadata' file is missing. +TEST_F(bulk_load_test, bulk_load_test_missing_p0_bulk_load_metadata) { - // bulk load failed because partition[0] `bulk_load_metadata` file is missing - ASSERT_NO_FATAL_FAILURE(remove_file(bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ + - "/0/bulk_load_metadata")); + NO_FATALS(remove_file(BULK_LOAD_LOCAL_APP_ROOT + "/0/bulk_load_metadata")); ASSERT_EQ(ERR_OK, start_bulk_load()); - // bulk load will get FAILED ASSERT_EQ(bulk_load_status::BLS_FAILED, wait_bulk_load_finish(300)); - - // recover complete files - ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files()); - - // write old data - ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "oldValue", 10)); - ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10)); - - ASSERT_EQ(ERR_OK, start_bulk_load()); - ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300)); - std::cout << "Start to verify data..." << std::endl; - ASSERT_NO_FATAL_FAILURE(verify_bulk_load_data()); - - // value overide by bulk_loaded_data - ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, VALUE, 10)); - - // write data after bulk load succeed - ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "valueAfterBulkLoad", 20)); - ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "valueAfterBulkLoad", 20)); - - // del data after bulk load succeed - ASSERT_NO_FATAL_FAILURE(operate_data(operation::DEL, "", 15)); - ASSERT_NO_FATAL_FAILURE(operate_data(operation::NO_VALUE, "", 15)); } -/// -/// case1: inconsistent ingest_behind -/// case2: bulk load(ingest_behind) succeed with data verfied -/// case3: bulk load data consistent: -/// - bulk load data will be overrided by old data -/// - get/set/del succeed after bulk load -/// -TEST_F(bulk_load_test, bulk_load_ingest_behind_tests) +// Test bulk load failed because the allow_ingest_behind config is inconsistent. +TEST_F(bulk_load_test, allow_ingest_behind_inconsistent) { - ASSERT_NO_FATAL_FAILURE(update_allow_ingest_behind("false")); - - // app envs allow_ingest_behind = false, request ingest_behind = true + NO_FATALS(update_allow_ingest_behind("false")); ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load(true)); +} - ASSERT_NO_FATAL_FAILURE(update_allow_ingest_behind("true")); - - // write old data - ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "oldValue", 10)); - ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10)); - - ASSERT_EQ(ERR_OK, start_bulk_load(true)); - ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300)); - - std::cout << "Start to verify data..." << std::endl; - // value overide by bulk_loaded_data - ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10)); - ASSERT_NO_FATAL_FAILURE(verify_data("hashkey", "sortkey")); - - // write data after bulk load succeed - ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "valueAfterBulkLoad", 20)); - ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "valueAfterBulkLoad", 20)); +// Test normal bulk load, old data will be overwritten by bulk load data. +TEST_F(bulk_load_test, bulk_load_tests) +{ + check_bulk_load(false, "oldValue", "valueAfterBulkLoad"); +} - // del data after bulk load succeed - ASSERT_NO_FATAL_FAILURE(operate_data(operation::DEL, "", 15)); - ASSERT_NO_FATAL_FAILURE(operate_data(operation::NO_VALUE, "", 15)); +// Test normal bulk load with allow_ingest_behind=true, old data will NOT be overwritten by bulk +// load data. +TEST_F(bulk_load_test, bulk_load_ingest_behind_tests) +{ + NO_FATALS(update_allow_ingest_behind("true")); + check_bulk_load(false, "oldValue", "valueAfterBulkLoad"); }