From d5c97d4004a5b603c4c024c7d991b1ef476cd13d Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Mon, 25 May 2020 18:29:46 +0800 Subject: [PATCH] feat(bulk-load): bulk load download part2 - replica download files (#471) --- src/dist/replication/lib/replica.h | 1 + .../replication/lib/replica_file_provider.cpp | 130 +++++++++++++++++- .../test/replica_test/unit_test/mock_utils.h | 8 ++ .../unit_test/replica_file_provider_test.cpp | 96 +++++++++++++ 4 files changed, 231 insertions(+), 4 deletions(-) create mode 100644 src/dist/replication/test/replica_test/unit_test/replica_file_provider_test.cpp diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 6917ea92f5..847846f852 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -455,6 +455,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_test; friend class replica_backup_manager; friend class replica_bulk_loader; + friend class replica_file_provider_test; // replica configuration, updated by update_local_configuration ONLY replica_configuration _config; diff --git a/src/dist/replication/lib/replica_file_provider.cpp b/src/dist/replication/lib/replica_file_provider.cpp index 88f1744f38..b8dad81a94 100644 --- a/src/dist/replication/lib/replica_file_provider.cpp +++ b/src/dist/replication/lib/replica_file_provider.cpp @@ -20,10 +20,132 @@ error_code replica::do_download(const std::string &remote_dir, dist::block_service::block_filesystem *fs, /*out*/ uint64_t &download_file_size) { - // TODO(heyuchen): TBD - // download files from remote provider - // this function can also be used in restore - return ERR_OK; + error_code download_err = ERR_OK; + task_tracker tracker; + + auto download_file_callback_func = [this, &download_err, &download_file_size]( + const dist::block_service::download_response &resp, + dist::block_service::block_file_ptr bf, + const std::string &local_file_name) { + if (resp.err != ERR_OK) { + // during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable + // error, however, if file damaged on remote file provider, bulk load should stop, + // return ERR_CORRUPTION instead + if (resp.err == ERR_OBJECT_NOT_FOUND) { + derror_replica("download file({}) failed, file on remote file provider is damaged", + local_file_name); + download_err = ERR_CORRUPTION; + } else { + download_err = resp.err; + } + return; + } + + if (resp.downloaded_size != bf->get_size()) { + derror_replica( + "size not match while downloading file({}), file_size({}) vs downloaded_size({})", + bf->file_name(), + bf->get_size(), + resp.downloaded_size); + download_err = ERR_CORRUPTION; + return; + } + + std::string current_md5; + error_code e = utils::filesystem::md5sum(local_file_name, current_md5); + if (e != ERR_OK) { + derror_replica("calculate file({}) md5 failed", local_file_name); + download_err = e; + return; + } + if (current_md5 != bf->get_md5sum()) { + derror_replica( + "local file({}) is different from remote file({}), download failed, md5: " + "local({}) VS remote({})", + local_file_name, + bf->file_name(), + current_md5, + bf->get_md5sum()); + download_err = ERR_CORRUPTION; + return; + } + ddebug_replica("download file({}) succeed, file_size = {}", + local_file_name.c_str(), + resp.downloaded_size); + download_err = ERR_OK; + download_file_size = resp.downloaded_size; + }; + + auto create_file_cb = [this, + &local_dir, + &download_err, + &download_file_size, + &download_file_callback_func, + &tracker](const dist::block_service::create_file_response &resp, + const std::string &fname) { + if (resp.err != ERR_OK) { + derror_replica("create file({}) failed with error({})", fname, resp.err.to_string()); + download_err = resp.err; + return; + } + + dist::block_service::block_file *bf = resp.file_handle.get(); + if (bf->get_md5sum().empty()) { + derror_replica("file({}) doesn't exist on remote file provider", bf->file_name()); + download_err = ERR_CORRUPTION; + return; + } + + const std::string &local_file_name = utils::filesystem::path_combine(local_dir, fname); + // local file exists + if (utils::filesystem::file_exists(local_file_name)) { + std::string current_md5; + error_code e = utils::filesystem::md5sum(local_file_name, current_md5); + if (e != ERR_OK || current_md5 != bf->get_md5sum()) { + if (e != ERR_OK) { + dwarn_replica("calculate file({}) md5 failed, should remove and redownload it", + local_file_name); + } else { + dwarn_replica( + "local file({}) is different from remote file({}), md5: local({}) VS " + "remote({}), should remove and redownload it", + local_file_name, + bf->file_name(), + current_md5, + bf->get_md5sum()); + } + if (!utils::filesystem::remove_path(local_file_name)) { + derror_replica("failed to remove file({})", local_file_name); + download_err = e; + return; + } + } else { + download_err = ERR_OK; + download_file_size = bf->get_size(); + ddebug_replica("local file({}) has been downloaded, file size = {}", + local_file_name, + download_file_size); + return; + } + } + + // download or redownload file + bf->download(dist::block_service::download_request{local_file_name, 0, -1}, + TASK_CODE_EXEC_INLINED, + std::bind(download_file_callback_func, + std::placeholders::_1, + resp.file_handle, + local_file_name), + &tracker); + }; + + const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name); + fs->create_file(dist::block_service::create_file_request{remote_file_name, false}, + TASK_CODE_EXEC_INLINED, + std::bind(create_file_cb, std::placeholders::_1, file_name), + &tracker); + tracker.wait_outstanding_tasks(); + return download_err; } } // namespace replication diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index 06e3f6581d..c8e15ad9bc 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -26,6 +26,8 @@ #pragma once +#include "block_service_mock.h" + #include #include #include @@ -244,6 +246,12 @@ class mock_replica_stub : public replica_stub { _bulk_load_downloading_count.store(count); } + + std::unique_ptr get_block_filesystem() + { + std::unique_ptr block_service = make_unique(); + return block_service; + } }; class mock_log_file : public log_file diff --git a/src/dist/replication/test/replica_test/unit_test/replica_file_provider_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_file_provider_test.cpp new file mode 100644 index 0000000000..b40ff66776 --- /dev/null +++ b/src/dist/replication/test/replica_test/unit_test/replica_file_provider_test.cpp @@ -0,0 +1,96 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "replica_test_base.h" + +#include + +#include + +namespace dsn { +namespace replication { + +class replica_file_provider_test : public replica_test_base +{ +public: + replica_file_provider_test() + { + _replica = create_mock_replica(stub.get()); + _fs = stub->get_block_filesystem(); + utils::filesystem::create_directory(LOCAL_DIR); + } + + ~replica_file_provider_test() { utils::filesystem::remove_path(LOCAL_DIR); } + +public: + error_code test_do_download() + { + uint64_t download_size = 0; + return _replica->do_download(PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get(), download_size); + } + + void create_local_file(const std::string &file_name) + { + std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name); + utils::filesystem::create_file(whole_name); + std::ofstream test_file; + test_file.open(whole_name); + test_file << "write some data.\n"; + test_file.close(); + + _file_meta.name = whole_name; + utils::filesystem::md5sum(whole_name, _file_meta.md5); + utils::filesystem::file_size(whole_name, _file_meta.size); + } + + void create_remote_file(const std::string &file_name, int64_t size, const std::string &md5) + { + std::string whole_file_name = utils::filesystem::path_combine(PROVIDER, file_name); + _fs->files[whole_file_name] = std::make_pair(size, md5); + } + +public: + std::unique_ptr _replica; + std::unique_ptr _fs; + + file_meta _file_meta; + std::string PROVIDER = "local_service"; + std::string LOCAL_DIR = "test_dir"; + std::string FILE_NAME = "test_file"; +}; + +// do_download unit tests +TEST_F(replica_file_provider_test, do_download_remote_file_not_exist) +{ + ASSERT_EQ(test_do_download(), ERR_CORRUPTION); +} + +TEST_F(replica_file_provider_test, do_download_redownload_file) +{ + // local file exists, but md5 not matched with remote file + // expected to remove old local file and redownload it + create_local_file(FILE_NAME); + create_remote_file(FILE_NAME, 2333, "md5_not_match"); + ASSERT_EQ(test_do_download(), ERR_OK); +} + +TEST_F(replica_file_provider_test, do_download_file_exist) +{ + create_local_file(FILE_NAME); + create_remote_file(FILE_NAME, _file_meta.size, _file_meta.md5); + ASSERT_EQ(test_do_download(), ERR_OK); +} + +TEST_F(replica_file_provider_test, do_download_succeed) +{ + create_local_file(FILE_NAME); + create_remote_file(FILE_NAME, _file_meta.size, _file_meta.md5); + // remove local file to mock condition that file not existed + std::string file_name = utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME); + utils::filesystem::remove_path(file_name); + ASSERT_EQ(test_do_download(), ERR_OK); +} + +} // namespace replication +} // namespace dsn