diff --git a/src/rdsn/src/block_service/block_service_manager.cpp b/src/rdsn/src/block_service/block_service_manager.cpp index fb63e6b976..fccacc495e 100644 --- a/src/rdsn/src/block_service/block_service_manager.cpp +++ b/src/rdsn/src/block_service/block_service_manager.cpp @@ -112,6 +112,20 @@ static create_file_response create_block_file_sync(const std::string &remote_fil return ret; } +error_code block_service_manager::create_block_file(const std::string &remote_file_name, + bool ignore_meta, + block_filesystem *fs, + task_tracker *tracker, + /*out*/ create_file_response &create_resp) +{ + create_resp = create_block_file_sync(remote_file_name, ignore_meta, fs, tracker); + const auto &err = create_resp.err; + if (err != ERR_OK) { + derror_f("create file({}) failed with error({})", remote_file_name, err.to_string()); + } + return err; +} + static download_response download_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker) { @@ -150,19 +164,19 @@ error_code block_service_manager::download_file(const std::string &remote_dir, } task_tracker tracker; - // Create a block_file object. - const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name); - auto create_resp = - create_block_file_sync(remote_file_name, false /*ignore file meta*/, fs, &tracker); - error_code err = create_resp.err; + create_file_response create_resp; + auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name), + false /*ignore file meta*/, + fs, + &tracker, + create_resp); if (err != ERR_OK) { - derror_f("create file({}) failed with error({})", remote_file_name, err.to_string()); return err; } - block_file_ptr bf = create_resp.file_handle; - download_response resp = download_block_file_sync(local_file_name, bf.get(), &tracker); + download_response resp = + download_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker); if (resp.err != ERR_OK) { // during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable // error, however, if file damaged on remote file provider, bulk load should stop, @@ -184,6 +198,133 @@ error_code block_service_manager::download_file(const std::string &remote_dir, return ERR_OK; } +static upload_response +upload_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker) +{ + upload_response ret; + bf->upload(upload_request{local_file_path}, + TASK_CODE_EXEC_INLINED, + [&ret](const upload_response &resp) { ret = resp; }, + tracker); + tracker->wait_outstanding_tasks(); + return ret; +} + +error_code block_service_manager::upload_file(const std::string &remote_dir, + const std::string &local_dir, + const std::string &file_name, + block_filesystem *fs) +{ + task_tracker tracker; + // Create a block_file object. + create_file_response create_resp; + auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name), + false /*ignore file meta*/, + fs, + &tracker, + create_resp); + if (err != ERR_OK) { + return err; + } + // Upload file + const auto &local_file_name = utils::filesystem::path_combine(local_dir, file_name); + const upload_response &resp = + upload_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker); + if (resp.err != ERR_OK) { + return resp.err; + } + ddebug_f("upload file({}) succeed", local_file_name); + return ERR_OK; +} + +static write_response +write_block_file_sync(const blob &value, block_file *bf, task_tracker *tracker) +{ + write_response ret; + bf->write(write_request{value}, + TASK_CODE_EXEC_INLINED, + [&ret](const write_response &resp) { ret = resp; }, + tracker); + tracker->wait_outstanding_tasks(); + return ret; +} + +error_code block_service_manager::write_file(const std::string &remote_dir, + const std::string &file_name, + const blob &value, + block_filesystem *fs) +{ + task_tracker tracker; + // Create a block_file object. + create_file_response create_resp; + const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name); + auto err = + create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp); + if (err != ERR_OK) { + return err; + } + // Write blob + const write_response &resp = + write_block_file_sync(value, create_resp.file_handle.get(), &tracker); + if (resp.err != ERR_OK) { + return resp.err; + } + ddebug_f("write remote file({}) succeed", remote_file_name); + return ERR_OK; +} + +error_code +block_service_manager::remove_path(const std::string &path, bool recursive, block_filesystem *fs) +{ + task_tracker tracker; + remove_path_response ret; + fs->remove_path(remove_path_request{path, recursive}, + TASK_CODE_EXEC_INLINED, + [&ret](const remove_path_response &resp) { ret = resp; }, + &tracker); + tracker.wait_outstanding_tasks(); + return ret.err; +} + +static read_response read_block_file_sync(block_file *bf, + const uint64_t remote_pos, + const int64_t remote_length, + task_tracker *tracker) +{ + read_response ret; + bf->read(read_request{remote_pos, remote_length}, + TASK_CODE_EXEC_INLINED, + [&ret](const read_response &resp) { ret = resp; }, + tracker); + tracker->wait_outstanding_tasks(); + return ret; +} + +error_code block_service_manager::read_file(const std::string &remote_dir, + const std::string &file_name, + block_filesystem *fs, + blob &value) +{ + task_tracker tracker; + // Create a block_file object. + create_file_response create_resp; + const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name); + auto err = + create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp); + if (err != ERR_OK) { + return err; + } + // Read blob + const read_response &resp = read_block_file_sync( + create_resp.file_handle.get(), 0 /*remote_pos*/, -1 /*remote_length*/, &tracker); + if (resp.err != ERR_OK) { + return resp.err; + } + ddebug_f("read remote file({}) succeed", remote_file_name); + value = resp.buffer; + return ERR_OK; +} + } // namespace block_service } // namespace dist } // namespace dsn diff --git a/src/rdsn/src/block_service/block_service_manager.h b/src/rdsn/src/block_service/block_service_manager.h index 65b0b7294b..ec67db1bd6 100644 --- a/src/rdsn/src/block_service/block_service_manager.h +++ b/src/rdsn/src/block_service/block_service_manager.h @@ -45,6 +45,14 @@ class block_service_manager ~block_service_manager(); block_filesystem *get_or_create_block_filesystem(const std::string &provider); + // create block file + // \return ERR_FS_INTERNAL: remote file system error + error_code create_block_file(const std::string &remote_file_name, + bool ignore_meta, + block_filesystem *fs, + task_tracker *tracker, + /*out*/ create_file_response &resp); + // download files from remote file system // \return ERR_FILE_OPERATION_FAILED: local file system error // \return ERR_FS_INTERNAL: remote file system error @@ -69,6 +77,36 @@ class block_service_manager block_filesystem *fs, /*out*/ uint64_t &download_file_size); + // upload files from remote file system + // \return ERR_FILE_OPERATION_FAILED: local file system error + // \return ERR_FS_INTERNAL: remote file system error + error_code upload_file(const std::string &remote_dir, + const std::string &local_dir, + const std::string &file_name, + block_filesystem *fs); + + // write blob value onto remote file system + // \return ERR_FILE_OPERATION_FAILED: local file system error + // \return ERR_FS_INTERNAL: remote file system error + error_code write_file(const std::string &remote_dir, + const std::string &file_name, + const blob &value, + block_filesystem *fs); + + // remove path on remote file system + // \return ERR_OBJECT_NOT_FOUND: remove path not exist + // \return ERR_FS_INTERNAL: remote file system error + // \return ERR_DIR_NOT_EMPTY: path not empty + error_code remove_path(const std::string &path, bool recursive, block_filesystem *fs); + + // read blob value from remote file system + // \return ERR_FILE_OPERATION_FAILED: local file system error + // \return ERR_FS_INTERNAL: remote file system error + error_code read_file(const std::string &remote_dir, + const std::string &file_name, + block_filesystem *fs, + /*out*/ blob &value); + private: block_service_registry &_registry_holder; diff --git a/src/rdsn/src/block_service/test/block_service_manager_test.cpp b/src/rdsn/src/block_service/test/block_service_manager_test.cpp index e2269de6b7..7c5f863c4b 100644 --- a/src/rdsn/src/block_service/test/block_service_manager_test.cpp +++ b/src/rdsn/src/block_service/test/block_service_manager_test.cpp @@ -46,6 +46,27 @@ class block_service_manager_test : public ::testing::Test PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get(), download_size); } + error_code test_upload_file() + { + return _block_service_manager.upload_file(PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get()); + } + + error_code test_write_file() + { + return _block_service_manager.write_file( + PROVIDER, FILE_NAME, blob::create_from_bytes("test_value"), _fs.get()); + } + + error_code test_read_file(blob &value) + { + return _block_service_manager.read_file(PROVIDER, FILE_NAME, _fs.get(), value); + } + + error_code test_remove_path(bool recursive) + { + return _block_service_manager.remove_path(REMOTE_DIR, recursive, _fs.get()); + } + void create_local_file(const std::string &file_name) { std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name); @@ -63,9 +84,28 @@ class block_service_manager_test : public ::testing::Test void create_remote_file(const std::string &file_name, int64_t size, const std::string &md5) { std::string whole_file_name = utils::filesystem::path_combine(PROVIDER, file_name); + _fs->files[whole_file_name] = std::make_pair(size, md5); } + void create_remote_dir(bool empty) + { + std::vector entries; + if (!empty) { + ls_entry dir_entry; + dir_entry.entry_name = REMOTE_DIR; + dir_entry.is_directory = true; + entries.emplace_back(dir_entry); + ls_entry file_entry; + file_entry.entry_name = FILE_NAME; + file_entry.is_directory = false; + entries.emplace_back(file_entry); + } + _fs->dir_files[REMOTE_DIR] = entries; + } + + void clear_remote_dir() { _fs->dir_files.clear(); } + public: block_service_manager _block_service_manager; std::unique_ptr _fs; @@ -74,6 +114,7 @@ class block_service_manager_test : public ::testing::Test std::string PROVIDER = "local_service"; std::string LOCAL_DIR = "test_dir"; std::string FILE_NAME = "test_file"; + std::string REMOTE_DIR = "remote_test_dir"; }; // download_file unit tests @@ -119,6 +160,42 @@ TEST_F(block_service_manager_test, do_download_succeed) ASSERT_EQ(download_size, _file_meta.size); } +TEST_F(block_service_manager_test, upload_file_test) +{ + create_local_file(FILE_NAME); + ASSERT_EQ(test_upload_file(), ERR_OK); +} + +TEST_F(block_service_manager_test, write_file_test) { ASSERT_EQ(test_write_file(), ERR_OK); } + +TEST_F(block_service_manager_test, read_file_test) +{ + blob value; + ASSERT_EQ(test_read_file(value), ERR_OK); +} + +TEST_F(block_service_manager_test, remove_path_test) +{ + struct test_struct + { + bool mock_dir; + bool dir_empty; + bool recursive; + error_code expected_err; + } tests[]{{false, false, false, ERR_OBJECT_NOT_FOUND}, + {true, true, false, ERR_OK}, + {true, true, true, ERR_OK}, + {true, false, false, ERR_DIR_NOT_EMPTY}, + {true, false, true, ERR_OK}}; + for (const auto &test : tests) { + if (test.mock_dir) { + create_remote_dir(test.dir_empty); + } + ASSERT_EQ(test_remove_path(test.recursive), test.expected_err); + clear_remote_dir(); + } +} + } // namespace block_service } // namespace dist } // namespace dsn diff --git a/src/rdsn/src/block_service/test/block_service_mock.h b/src/rdsn/src/block_service/test/block_service_mock.h index 5c899a39db..dc23063b14 100644 --- a/src/rdsn/src/block_service/test/block_service_mock.h +++ b/src/rdsn/src/block_service/test/block_service_mock.h @@ -145,7 +145,10 @@ class block_service_mock : public block_filesystem { public: block_service_mock() - : block_filesystem(), enable_create_file_fail(false), enable_list_dir_fail(false) + : block_filesystem(), + enable_create_file_fail(false), + enable_list_dir_fail(false), + enable_remote_path_fail(false) { } virtual error_code initialize(const std::vector &args) { return ERR_OK; } @@ -201,6 +204,25 @@ class block_service_mock : public block_filesystem const remove_path_callback &cb, dsn::task_tracker *tracker) { + remove_path_response resp; + if (enable_remote_path_fail) { + resp.err = ERR_MOCK_INTERNAL; + } else { + resp.err = ERR_OK; + std::string path_name = req.path; + if (dir_files.find(path_name) == dir_files.end()) { + resp.err = ERR_OBJECT_NOT_FOUND; + } else { + std::vector files = dir_files[path_name]; + if (!files.empty() && !req.recursive) { + resp.err = ERR_DIR_NOT_EMPTY; + } else { + dir_files.erase(path_name); + resp.err = ERR_OK; + } + } + } + cb(resp); return task_ptr(); } @@ -209,6 +231,7 @@ class block_service_mock : public block_filesystem std::map> files; bool enable_create_file_fail; bool enable_list_dir_fail; + bool enable_remote_path_fail; }; } // namespace block_service