diff --git a/include/dsn/utility/filesystem.h b/include/dsn/utility/filesystem.h index 9da8a4954d..85524e727a 100644 --- a/include/dsn/utility/filesystem.h +++ b/include/dsn/utility/filesystem.h @@ -129,6 +129,8 @@ error_code md5sum(const std::string &file_path, /*out*/ std::string &result); // B is represent wheter the directory is empty, true means empty, otherwise false std::pair is_directory_empty(const std::string &dirname); +error_code read_file(const std::string &fname, /*out*/ std::string &buf); + } // namespace filesystem } // namespace utils } // namespace dsn diff --git a/src/core/core/filesystem.cpp b/src/core/core/filesystem.cpp index 4acc443229..5e658a9958 100644 --- a/src/core/core/filesystem.cpp +++ b/src/core/core/filesystem.cpp @@ -33,7 +33,10 @@ * xxxx-xx-xx, author, fix bug about xxx */ +#include + #include +#include #include #include #include @@ -763,6 +766,36 @@ std::pair is_directory_empty(const std::string &dirname) } return res; } + +error_code read_file(const std::string &fname, std::string &buf) +{ + if (!file_exists(fname)) { + derror_f("file({}) doesn't exist", fname); + return ERR_FILE_OPERATION_FAILED; + } + + int64_t file_sz = 0; + if (!file_size(fname, file_sz)) { + derror_f("get file({}) size failed", fname); + return ERR_FILE_OPERATION_FAILED; + } + + buf.resize(file_sz); + std::ifstream fin(fname, std::ifstream::in); + if (!fin.is_open()) { + derror_f("open file({}) failed", fname); + return ERR_FILE_OPERATION_FAILED; + } + fin.read(&buf[0], file_sz); + dassert_f(file_sz == fin.gcount(), + "read file({}) failed, file_size = {} but read size = {}", + fname, + file_sz, + fin.gcount()); + fin.close(); + return ERR_OK; +} + } // namespace filesystem } // namespace utils } // namespace dsn diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index bc1da85f51..34f6e96a94 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -650,6 +650,7 @@ const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10; const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5; const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata"); const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR(".bulk_load"); +const int32_t bulk_load_constant::PROGRESS_FINISHED = 100; namespace cold_backup { std::string get_policy_path(const std::string &root, const std::string &policy_name) diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 98d29606e7..bd8ef6b1e0 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -165,7 +165,7 @@ class bulk_load_constant static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL; static const std::string BULK_LOAD_METADATA; static const std::string BULK_LOAD_LOCAL_ROOT_DIR; - // TODO(heyuchen): add more constant in further pr + static const int32_t PROGRESS_FINISHED; }; namespace cold_backup { diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp index d3065bed75..b0b93d6b68 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp @@ -4,8 +4,6 @@ #include "replica_bulk_loader.h" -#include - #include #include #include @@ -275,7 +273,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name, // parse metadata const std::string &local_metadata_file_name = utils::filesystem::path_combine(local_dir, bulk_load_constant::BULK_LOAD_METADATA); - err = parse_bulk_load_metadata(local_metadata_file_name, _metadata); + err = parse_bulk_load_metadata(local_metadata_file_name); if (err != ERR_OK) { derror_replica("parse bulk load metadata failed, error = {}", err.to_string()); return err; @@ -288,7 +286,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name, uint64_t f_size = 0; error_code ec = _replica->do_download(remote_dir, local_dir, f_meta.name, fs, f_size); - if (ec == ERR_OK && !verify_sst_files(f_meta, local_dir)) { + if (ec == ERR_OK && !verify_file(f_meta, local_dir)) { ec = ERR_CORRUPTION; } if (ec != ERR_OK) { @@ -309,19 +307,54 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name, } // ThreadPool: THREAD_POOL_REPLICATION -error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname, - /*out*/ bulk_load_metadata &meta) +error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname) { - // TODO(heyuchen): TBD - // read file and parse file content as bulk_load_metadata + std::string buf; + error_code ec = utils::filesystem::read_file(fname, buf); + if (ec != ERR_OK) { + derror_replica("read file {} failed, error = {}", fname, ec); + return ec; + } + + blob bb = blob::create_from_bytes(std::move(buf)); + if (!json::json_forwarder::decode(bb, _metadata)) { + derror_replica("file({}) is damaged", fname); + return ERR_CORRUPTION; + } + + if (_metadata.file_total_size <= 0) { + derror_replica("bulk_load_metadata has invalid file_total_size({})", + _metadata.file_total_size); + return ERR_CORRUPTION; + } + return ERR_OK; } // ThreadPool: THREAD_POOL_REPLICATION_LONG -bool replica_bulk_loader::verify_sst_files(const file_meta &f_meta, const std::string &local_dir) +bool replica_bulk_loader::verify_file(const file_meta &f_meta, const std::string &local_dir) { - // TODO(heyuchen): TBD - // compare sst file metadata calculated by file and parsed by metadata + const std::string local_file = utils::filesystem::path_combine(local_dir, f_meta.name); + int64_t f_size = 0; + if (!utils::filesystem::file_size(local_file, f_size)) { + derror_replica("verify file({}) failed, becaused failed to get file size", local_file); + return false; + } + std::string md5; + if (utils::filesystem::md5sum(local_file, md5) != ERR_OK) { + derror_replica("verify file({}) failed, becaused failed to get file md5", local_file); + return false; + } + if (f_size != f_meta.size || md5 != f_meta.md5) { + derror_replica( + "verify file({}) failed, because file damaged, size: {} VS {}, md5: {} VS {}", + local_file, + f_size, + f_meta.size, + md5, + f_meta.md5); + return false; + } return true; } @@ -329,8 +362,27 @@ bool replica_bulk_loader::verify_sst_files(const file_meta &f_meta, const std::s void replica_bulk_loader::update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name) { - // TODO(heyuchen): TBD - // update download progress after downloading sst files succeed + if (_metadata.file_total_size <= 0) { + derror_replica("bulk_load_metadata has invalid file_total_size({})", + _metadata.file_total_size); + return; + } + + ddebug_replica("update progress after downloading file({})", file_name); + _cur_downloaded_size.fetch_add(file_size); + auto total_size = static_cast(_metadata.file_total_size); + auto cur_downloaded_size = static_cast(_cur_downloaded_size.load()); + auto cur_progress = static_cast((cur_downloaded_size / total_size) * 100); + _download_progress.store(cur_progress); + ddebug_replica("total_size = {}, cur_downloaded_size = {}, progress = {}", + total_size, + cur_downloaded_size, + cur_progress); + + tasking::enqueue(LPC_REPLICATION_COMMON, + tracker(), + std::bind(&replica_bulk_loader::check_download_finish, this), + get_gpid().thread_hash()); } // ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG @@ -342,6 +394,27 @@ void replica_bulk_loader::try_decrease_bulk_load_download_count() _stub->_bulk_load_downloading_count.load()); } +// ThreadPool: THREAD_POOL_REPLICATION +void replica_bulk_loader::check_download_finish() +{ + if (_download_progress.load() == bulk_load_constant::PROGRESS_FINISHED && + _status == bulk_load_status::BLS_DOWNLOADING) { + ddebug_replica("download all files succeed"); + _status = bulk_load_status::BLS_DOWNLOADED; + try_decrease_bulk_load_download_count(); + cleanup_download_task(); + } +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_bulk_loader::cleanup_download_task() +{ + for (auto &kv : _download_task) { + CLEANUP_TASK_ALWAYS(kv.second) + } + _download_task.clear(); +} + void replica_bulk_loader::clear_bulk_load_states() { // TODO(heyuchen): TBD diff --git a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h index 0bf9fded90..1bab20a55a 100644 --- a/src/dist/replication/lib/bulk_load/replica_bulk_loader.h +++ b/src/dist/replication/lib/bulk_load/replica_bulk_loader.h @@ -51,12 +51,19 @@ class replica_bulk_loader : replica_base // \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed // \return ERR_CORRUPTION: parse failed - error_code parse_bulk_load_metadata(const std::string &fname, /*out*/ bulk_load_metadata &meta); + error_code parse_bulk_load_metadata(const std::string &fname); - bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir); + // TODO(heyuchen): move this function into block service manager, also used by restore + // compare file metadata calculated by file and parsed by metadata + bool verify_file(const file_meta &f_meta, const std::string &local_dir); + + // update download progress after downloading sst files succeed void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name); + void try_decrease_bulk_load_download_count(); + void check_download_finish(); + void cleanup_download_task(); void clear_bulk_load_states(); void report_bulk_load_states_to_meta(bulk_load_status::type remote_status, @@ -102,6 +109,8 @@ class replica_bulk_loader : replica_base bulk_load_status::type _status{bulk_load_status::BLS_INVALID}; bulk_load_metadata _metadata; + std::atomic _cur_downloaded_size{0}; + std::atomic _download_progress{0}; std::atomic _download_status{ERR_OK}; // file_name -> downloading task std::map _download_task; diff --git a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp index e6970b6b1a..0b77fb8793 100644 --- a/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp +++ b/src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp @@ -47,6 +47,20 @@ class replica_bulk_loader_test : public replica_test_base return _bulk_loader->bulk_load_start_download(APP_NAME, CLUSTER, PROVIDER); } + error_code test_parse_bulk_load_metadata(const std::string &file_path) + { + return _bulk_loader->parse_bulk_load_metadata(file_path); + } + + bool test_verify_file(int64_t size, const std::string &md5) + { + file_meta f_meta; + f_meta.name = FILE_NAME; + f_meta.size = size; + f_meta.md5 = md5; + return _bulk_loader->verify_file(f_meta, LOCAL_DIR); + } + /// mock structure functions void @@ -101,15 +115,82 @@ class replica_bulk_loader_test : public replica_test_base config.secondaries.emplace_back(SECONDARY2); } + void create_local_file(const std::string &file_name) + { + std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name); + utils::filesystem::create_file(whole_name); + std::ofstream test_file; + test_file.open(whole_name); + test_file << "write some data.\n"; + test_file.close(); + + _file_meta.name = whole_name; + utils::filesystem::md5sum(whole_name, _file_meta.md5); + utils::filesystem::file_size(whole_name, _file_meta.size); + } + + error_code create_local_metadata_file() + { + create_local_file(FILE_NAME); + _metadata.files.emplace_back(_file_meta); + _metadata.file_total_size = _file_meta.size; + + std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA); + utils::filesystem::create_file(whole_name); + std::ofstream os(whole_name.c_str(), + (std::ofstream::out | std::ios::binary | std::ofstream::trunc)); + if (!os.is_open()) { + derror("open file %s failed", whole_name.c_str()); + return ERR_FILE_OPERATION_FAILED; + } + + blob bb = json::json_forwarder::encode(_metadata); + os.write((const char *)bb.data(), (std::streamsize)bb.length()); + if (os.bad()) { + derror("write file %s failed", whole_name.c_str()); + return ERR_FILE_OPERATION_FAILED; + } + os.close(); + + return ERR_OK; + } + + bool validate_metadata() + { + auto target = _bulk_loader->_metadata; + if (target.file_total_size != _metadata.file_total_size) { + return false; + } + if (target.files.size() != _metadata.files.size()) { + return false; + } + for (int i = 0; i < target.files.size(); ++i) { + if (target.files[i].name != _metadata.files[i].name) { + return false; + } + if (target.files[i].size != _metadata.files[i].size) { + return false; + } + if (target.files[i].md5 != _metadata.files[i].md5) { + return false; + } + } + return true; + } + // helper functions bulk_load_status::type get_bulk_load_status() const { return _bulk_loader->_status; } public: std::unique_ptr _replica; std::unique_ptr _bulk_loader; + bulk_load_request _req; group_bulk_load_request _group_req; + file_meta _file_meta; + bulk_load_metadata _metadata; + std::string APP_NAME = "replica"; std::string CLUSTER = "cluster"; std::string PROVIDER = "local_service"; @@ -119,6 +200,9 @@ class replica_bulk_loader_test : public replica_test_base rpc_address SECONDARY = rpc_address("127.0.0.3", 34801); rpc_address SECONDARY2 = rpc_address("127.0.0.4", 34801); int32_t MAX_DOWNLOADING_COUNT = 5; + std::string LOCAL_DIR = bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR; + std::string METADATA = bulk_load_constant::BULK_LOAD_METADATA; + std::string FILE_NAME = "test_sst_file"; }; // on_bulk_load unit tests @@ -196,5 +280,53 @@ TEST_F(replica_bulk_loader_test, start_downloading_test) } } +// parse_bulk_load_metadata unit tests +TEST_F(replica_bulk_loader_test, bulk_load_metadata_not_exist) +{ + ASSERT_EQ(test_parse_bulk_load_metadata("path_not_exist"), ERR_FILE_OPERATION_FAILED); +} + +TEST_F(replica_bulk_loader_test, bulk_load_metadata_corrupt) +{ + // create file can not parse as bulk_load_metadata structure + utils::filesystem::create_directory(LOCAL_DIR); + create_local_file(METADATA); + std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA); + error_code ec = test_parse_bulk_load_metadata(metadata_file_name); + ASSERT_EQ(ec, ERR_CORRUPTION); + utils::filesystem::remove_path(LOCAL_DIR); +} + +TEST_F(replica_bulk_loader_test, bulk_load_metadata_parse_succeed) +{ + utils::filesystem::create_directory(LOCAL_DIR); + error_code ec = create_local_metadata_file(); + ASSERT_EQ(ec, ERR_OK); + + std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA); + ec = test_parse_bulk_load_metadata(metadata_file_name); + ASSERT_EQ(ec, ERR_OK); + ASSERT_TRUE(validate_metadata()); + utils::filesystem::remove_path(LOCAL_DIR); +} + +// TODO(heyuchen): move tests into block service +// verify_file unit tests +TEST_F(replica_bulk_loader_test, verify_file_failed) +{ + utils::filesystem::create_directory(LOCAL_DIR); + create_local_file(FILE_NAME); + ASSERT_FALSE(test_verify_file(_file_meta.size, "wrong_md5")); + utils::filesystem::remove_path(LOCAL_DIR); +} + +TEST_F(replica_bulk_loader_test, verify_file_succeed) +{ + utils::filesystem::create_directory(LOCAL_DIR); + create_local_file(FILE_NAME); + ASSERT_TRUE(test_verify_file(_file_meta.size, _file_meta.md5)); + utils::filesystem::remove_path(LOCAL_DIR); +} + } // namespace replication } // namespace dsn