diff --git a/src/block_service/block_service_manager.cpp b/src/block_service/block_service_manager.cpp index bcdf94123d..78757db402 100644 --- a/src/block_service/block_service_manager.cpp +++ b/src/block_service/block_service_manager.cpp @@ -84,6 +84,32 @@ block_filesystem *block_service_manager::get_block_filesystem(const std::string } } +static create_file_response create_block_file_sync(const std::string &remote_file_path, + bool ignore_meta, + block_filesystem *fs, + task_tracker *tracker) +{ + create_file_response ret; + fs->create_file(create_file_request{remote_file_path, ignore_meta}, + TASK_CODE_EXEC_INLINED, + [&ret](const create_file_response &resp) { ret = resp; }, + tracker); + tracker->wait_outstanding_tasks(); + return ret; +} + +static download_response +download_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker) +{ + download_response ret; + bf->download(download_request{local_file_path, 0, -1}, + TASK_CODE_EXEC_INLINED, + [&ret](const download_response &resp) { ret = resp; }, + tracker); + tracker->wait_outstanding_tasks(); + return ret; +} + // ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG error_code block_service_manager::download_file(const std::string &remote_dir, const std::string &local_dir, @@ -91,126 +117,43 @@ error_code block_service_manager::download_file(const std::string &remote_dir, block_filesystem *fs, /*out*/ uint64_t &download_file_size) { - error_code download_err = ERR_OK; - task_tracker tracker; - - auto download_file_callback_func = [&download_err, &download_file_size]( - const download_response &resp, block_file_ptr bf, const std::string &local_file_name) { - if (resp.err != ERR_OK) { - // during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable - // error, however, if file damaged on remote file provider, bulk load should stop, - // return ERR_CORRUPTION instead - if (resp.err == ERR_OBJECT_NOT_FOUND) { - derror_f("download file({}) failed, file on remote file provider is damaged", - local_file_name); - download_err = ERR_CORRUPTION; - } else { - download_err = resp.err; - } - return; - } - - if (resp.downloaded_size != bf->get_size()) { - derror_f( - "size not match while downloading file({}), file_size({}) vs downloaded_size({})", - bf->file_name(), - bf->get_size(), - resp.downloaded_size); - download_err = ERR_CORRUPTION; - return; - } - - std::string current_md5; - error_code e = utils::filesystem::md5sum(local_file_name, current_md5); - if (e != ERR_OK) { - derror_f("calculate file({}) md5 failed", local_file_name); - download_err = e; - return; - } - if (current_md5 != bf->get_md5sum()) { - derror_f("local file({}) is different from remote file({}), download failed, md5: " - "local({}) VS remote({})", - local_file_name, - bf->file_name(), - current_md5, - bf->get_md5sum()); - download_err = ERR_CORRUPTION; - return; - } - ddebug_f("download file({}) succeed, file_size = {}", - local_file_name.c_str(), - resp.downloaded_size); - download_err = ERR_OK; - download_file_size = resp.downloaded_size; - }; - - auto create_file_cb = [&local_dir, - &download_err, - &download_file_size, - &download_file_callback_func, - &tracker](const create_file_response &resp, const std::string &fname) { - if (resp.err != ERR_OK) { - derror_f("create file({}) failed with error({})", fname, resp.err.to_string()); - download_err = resp.err; - return; - } + // local file exists + const std::string local_file_name = utils::filesystem::path_combine(local_dir, file_name); + if (utils::filesystem::file_exists(local_file_name)) { + ddebug_f("local file({}) has been downloaded", local_file_name); + return ERR_OK; + } - block_file *bf = resp.file_handle.get(); - if (bf->get_md5sum().empty()) { - derror_f("file({}) doesn't exist on remote file provider", bf->file_name()); - download_err = ERR_CORRUPTION; - return; - } + task_tracker tracker; - const std::string &local_file_name = utils::filesystem::path_combine(local_dir, fname); - // local file exists - if (utils::filesystem::file_exists(local_file_name)) { - std::string current_md5; - error_code e = utils::filesystem::md5sum(local_file_name, current_md5); - if (e != ERR_OK || current_md5 != bf->get_md5sum()) { - if (e != ERR_OK) { - dwarn_f("calculate file({}) md5 failed, should remove and redownload it", - local_file_name); - } else { - dwarn_f("local file({}) is different from remote file({}), md5: local({}) VS " - "remote({}), should remove and redownload it", - local_file_name, - bf->file_name(), - current_md5, - bf->get_md5sum()); - } - if (!utils::filesystem::remove_path(local_file_name)) { - derror_f("failed to remove file({})", local_file_name); - download_err = e; - return; - } - } else { - download_err = ERR_OK; - download_file_size = bf->get_size(); - ddebug_f("local file({}) has been downloaded, file size = {}", - local_file_name, - download_file_size); - return; - } + // Create a block_file object. + const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name); + auto create_resp = + create_block_file_sync(remote_file_name, false /*ignore file meta*/, fs, &tracker); + error_code err = create_resp.err; + if (err != ERR_OK) { + derror_f("create file({}) failed with error({})", remote_file_name, err.to_string()); + return err; + } + block_file_ptr bf = create_resp.file_handle; + + download_response resp = download_block_file_sync(local_file_name, bf.get(), &tracker); + if (resp.err != ERR_OK) { + // during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable + // error, however, if file damaged on remote file provider, bulk load should stop, + // return ERR_CORRUPTION instead + if (resp.err == ERR_OBJECT_NOT_FOUND) { + derror_f("download file({}) failed, file on remote file provider is damaged", + local_file_name); + return ERR_CORRUPTION; } + return resp.err; + } - // download or redownload file - bf->download(download_request{local_file_name, 0, -1}, - TASK_CODE_EXEC_INLINED, - std::bind(download_file_callback_func, - std::placeholders::_1, - resp.file_handle, - local_file_name), - &tracker); - }; - - const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name); - fs->create_file(create_file_request{remote_file_name, false}, - TASK_CODE_EXEC_INLINED, - std::bind(create_file_cb, std::placeholders::_1, file_name), - &tracker); - tracker.wait_outstanding_tasks(); - return download_err; + ddebug_f( + "download file({}) succeed, file_size = {}", local_file_name.c_str(), resp.downloaded_size); + download_file_size = resp.downloaded_size; + return ERR_OK; } } // namespace block_service diff --git a/src/block_service/block_service_manager.h b/src/block_service/block_service_manager.h index cb7d5e578f..d59d034af4 100644 --- a/src/block_service/block_service_manager.h +++ b/src/block_service/block_service_manager.h @@ -33,6 +33,11 @@ class block_service_manager // \return ERR_FS_INTERNAL: remote file system error // \return ERR_CORRUPTION: file not exist or damaged // if download file succeed, download_err = ERR_OK and set download_file_size + // + // TODO(wutao1): create block_filesystem_wrapper instead. + // NOTE: This function is not responsible for the correctness of the downloaded file. + // The file may be half-downloaded or corrupted due to disk failure. + // The users can compare checksums, and retry download if validation failed. error_code download_file(const std::string &remote_dir, const std::string &local_dir, const std::string &file_name, diff --git a/src/block_service/local/local_service.cpp b/src/block_service/local/local_service.cpp index 0863c6fcf5..9120e033fd 100644 --- a/src/block_service/local/local_service.cpp +++ b/src/block_service/local/local_service.cpp @@ -522,6 +522,6 @@ dsn::task_ptr local_file_object::download(const download_request &req, return tsk; } -} -} -} +} // namespace block_service +} // namespace dist +} // namespace dsn diff --git a/src/block_service/test/block_service_manager_test.cpp b/src/block_service/test/block_service_manager_test.cpp index 2065849281..e8ffe5607a 100644 --- a/src/block_service/test/block_service_manager_test.cpp +++ b/src/block_service/test/block_service_manager_test.cpp @@ -4,6 +4,7 @@ #include "block_service_mock.h" #include "block_service/block_service_manager.h" +#include "block_service/local/local_service.h" #include @@ -66,7 +67,13 @@ class block_service_manager_test : public ::testing::Test // download_file unit tests TEST_F(block_service_manager_test, do_download_remote_file_not_exist) { - ASSERT_EQ(test_download_file(), ERR_CORRUPTION); + utils::filesystem::remove_path(LOCAL_DIR); + auto fs = make_unique(); + fs->initialize({LOCAL_DIR}); + uint64_t download_size = 0; + error_code err = _block_service_manager.download_file( + PROVIDER, LOCAL_DIR, FILE_NAME, fs.get(), download_size); + ASSERT_EQ(err, ERR_CORRUPTION); // file does not exist } TEST_F(block_service_manager_test, do_download_redownload_file) diff --git a/src/block_service/test/config-test.ini b/src/block_service/test/config-test.ini index 877bc221ed..160ae9fa69 100644 --- a/src/block_service/test/config-test.ini +++ b/src/block_service/test/config-test.ini @@ -7,7 +7,7 @@ type = replica run = true count = 1 ports = 54321 -pools = THREAD_POOL_DEFAULT +pools = THREAD_POOL_DEFAULT,THREAD_POOL_LOCAL_SERVICE [core] tool = nativerun