Skip to content

Commit

Permalink
feat: add some functions for block service manager (#1109)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Aug 12, 2022
1 parent efd840a commit 22d48c3
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 9 deletions.
157 changes: 149 additions & 8 deletions src/rdsn/src/block_service/block_service_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ static create_file_response create_block_file_sync(const std::string &remote_fil
return ret;
}

error_code block_service_manager::create_block_file(const std::string &remote_file_name,
bool ignore_meta,
block_filesystem *fs,
task_tracker *tracker,
/*out*/ create_file_response &create_resp)
{
create_resp = create_block_file_sync(remote_file_name, ignore_meta, fs, tracker);
const auto &err = create_resp.err;
if (err != ERR_OK) {
derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
}
return err;
}

static download_response
download_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
{
Expand Down Expand Up @@ -150,19 +164,19 @@ error_code block_service_manager::download_file(const std::string &remote_dir,
}

task_tracker tracker;

// Create a block_file object.
const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
auto create_resp =
create_block_file_sync(remote_file_name, false /*ignore file meta*/, fs, &tracker);
error_code err = create_resp.err;
create_file_response create_resp;
auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name),
false /*ignore file meta*/,
fs,
&tracker,
create_resp);
if (err != ERR_OK) {
derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
return err;
}
block_file_ptr bf = create_resp.file_handle;

download_response resp = download_block_file_sync(local_file_name, bf.get(), &tracker);
download_response resp =
download_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker);
if (resp.err != ERR_OK) {
// during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
// error, however, if file damaged on remote file provider, bulk load should stop,
Expand All @@ -184,6 +198,133 @@ error_code block_service_manager::download_file(const std::string &remote_dir,
return ERR_OK;
}

static upload_response
upload_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
{
upload_response ret;
bf->upload(upload_request{local_file_path},
TASK_CODE_EXEC_INLINED,
[&ret](const upload_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}

error_code block_service_manager::upload_file(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
block_filesystem *fs)
{
task_tracker tracker;
// Create a block_file object.
create_file_response create_resp;
auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name),
false /*ignore file meta*/,
fs,
&tracker,
create_resp);
if (err != ERR_OK) {
return err;
}
// Upload file
const auto &local_file_name = utils::filesystem::path_combine(local_dir, file_name);
const upload_response &resp =
upload_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker);
if (resp.err != ERR_OK) {
return resp.err;
}
ddebug_f("upload file({}) succeed", local_file_name);
return ERR_OK;
}

static write_response
write_block_file_sync(const blob &value, block_file *bf, task_tracker *tracker)
{
write_response ret;
bf->write(write_request{value},
TASK_CODE_EXEC_INLINED,
[&ret](const write_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}

error_code block_service_manager::write_file(const std::string &remote_dir,
const std::string &file_name,
const blob &value,
block_filesystem *fs)
{
task_tracker tracker;
// Create a block_file object.
create_file_response create_resp;
const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
auto err =
create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp);
if (err != ERR_OK) {
return err;
}
// Write blob
const write_response &resp =
write_block_file_sync(value, create_resp.file_handle.get(), &tracker);
if (resp.err != ERR_OK) {
return resp.err;
}
ddebug_f("write remote file({}) succeed", remote_file_name);
return ERR_OK;
}

error_code
block_service_manager::remove_path(const std::string &path, bool recursive, block_filesystem *fs)
{
task_tracker tracker;
remove_path_response ret;
fs->remove_path(remove_path_request{path, recursive},
TASK_CODE_EXEC_INLINED,
[&ret](const remove_path_response &resp) { ret = resp; },
&tracker);
tracker.wait_outstanding_tasks();
return ret.err;
}

static read_response read_block_file_sync(block_file *bf,
const uint64_t remote_pos,
const int64_t remote_length,
task_tracker *tracker)
{
read_response ret;
bf->read(read_request{remote_pos, remote_length},
TASK_CODE_EXEC_INLINED,
[&ret](const read_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}

error_code block_service_manager::read_file(const std::string &remote_dir,
const std::string &file_name,
block_filesystem *fs,
blob &value)
{
task_tracker tracker;
// Create a block_file object.
create_file_response create_resp;
const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
auto err =
create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp);
if (err != ERR_OK) {
return err;
}
// Read blob
const read_response &resp = read_block_file_sync(
create_resp.file_handle.get(), 0 /*remote_pos*/, -1 /*remote_length*/, &tracker);
if (resp.err != ERR_OK) {
return resp.err;
}
ddebug_f("read remote file({}) succeed", remote_file_name);
value = resp.buffer;
return ERR_OK;
}

} // namespace block_service
} // namespace dist
} // namespace dsn
38 changes: 38 additions & 0 deletions src/rdsn/src/block_service/block_service_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ class block_service_manager
~block_service_manager();
block_filesystem *get_or_create_block_filesystem(const std::string &provider);

// create block file
// \return ERR_FS_INTERNAL: remote file system error
error_code create_block_file(const std::string &remote_file_name,
bool ignore_meta,
block_filesystem *fs,
task_tracker *tracker,
/*out*/ create_file_response &resp);

// download files from remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
Expand All @@ -69,6 +77,36 @@ class block_service_manager
block_filesystem *fs,
/*out*/ uint64_t &download_file_size);

// upload files from remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
error_code upload_file(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
block_filesystem *fs);

// write blob value onto remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
error_code write_file(const std::string &remote_dir,
const std::string &file_name,
const blob &value,
block_filesystem *fs);

// remove path on remote file system
// \return ERR_OBJECT_NOT_FOUND: remove path not exist
// \return ERR_FS_INTERNAL: remote file system error
// \return ERR_DIR_NOT_EMPTY: path not empty
error_code remove_path(const std::string &path, bool recursive, block_filesystem *fs);

// read blob value from remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
error_code read_file(const std::string &remote_dir,
const std::string &file_name,
block_filesystem *fs,
/*out*/ blob &value);

private:
block_service_registry &_registry_holder;

Expand Down
77 changes: 77 additions & 0 deletions src/rdsn/src/block_service/test/block_service_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ class block_service_manager_test : public ::testing::Test
PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get(), download_size);
}

error_code test_upload_file()
{
return _block_service_manager.upload_file(PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get());
}

error_code test_write_file()
{
return _block_service_manager.write_file(
PROVIDER, FILE_NAME, blob::create_from_bytes("test_value"), _fs.get());
}

error_code test_read_file(blob &value)
{
return _block_service_manager.read_file(PROVIDER, FILE_NAME, _fs.get(), value);
}

error_code test_remove_path(bool recursive)
{
return _block_service_manager.remove_path(REMOTE_DIR, recursive, _fs.get());
}

void create_local_file(const std::string &file_name)
{
std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
Expand All @@ -63,9 +84,28 @@ class block_service_manager_test : public ::testing::Test
void create_remote_file(const std::string &file_name, int64_t size, const std::string &md5)
{
std::string whole_file_name = utils::filesystem::path_combine(PROVIDER, file_name);

_fs->files[whole_file_name] = std::make_pair(size, md5);
}

void create_remote_dir(bool empty)
{
std::vector<ls_entry> entries;
if (!empty) {
ls_entry dir_entry;
dir_entry.entry_name = REMOTE_DIR;
dir_entry.is_directory = true;
entries.emplace_back(dir_entry);
ls_entry file_entry;
file_entry.entry_name = FILE_NAME;
file_entry.is_directory = false;
entries.emplace_back(file_entry);
}
_fs->dir_files[REMOTE_DIR] = entries;
}

void clear_remote_dir() { _fs->dir_files.clear(); }

public:
block_service_manager _block_service_manager;
std::unique_ptr<block_service_mock> _fs;
Expand All @@ -74,6 +114,7 @@ class block_service_manager_test : public ::testing::Test
std::string PROVIDER = "local_service";
std::string LOCAL_DIR = "test_dir";
std::string FILE_NAME = "test_file";
std::string REMOTE_DIR = "remote_test_dir";
};

// download_file unit tests
Expand Down Expand Up @@ -119,6 +160,42 @@ TEST_F(block_service_manager_test, do_download_succeed)
ASSERT_EQ(download_size, _file_meta.size);
}

TEST_F(block_service_manager_test, upload_file_test)
{
create_local_file(FILE_NAME);
ASSERT_EQ(test_upload_file(), ERR_OK);
}

TEST_F(block_service_manager_test, write_file_test) { ASSERT_EQ(test_write_file(), ERR_OK); }

TEST_F(block_service_manager_test, read_file_test)
{
blob value;
ASSERT_EQ(test_read_file(value), ERR_OK);
}

TEST_F(block_service_manager_test, remove_path_test)
{
struct test_struct
{
bool mock_dir;
bool dir_empty;
bool recursive;
error_code expected_err;
} tests[]{{false, false, false, ERR_OBJECT_NOT_FOUND},
{true, true, false, ERR_OK},
{true, true, true, ERR_OK},
{true, false, false, ERR_DIR_NOT_EMPTY},
{true, false, true, ERR_OK}};
for (const auto &test : tests) {
if (test.mock_dir) {
create_remote_dir(test.dir_empty);
}
ASSERT_EQ(test_remove_path(test.recursive), test.expected_err);
clear_remote_dir();
}
}

} // namespace block_service
} // namespace dist
} // namespace dsn
25 changes: 24 additions & 1 deletion src/rdsn/src/block_service/test/block_service_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ class block_service_mock : public block_filesystem
{
public:
block_service_mock()
: block_filesystem(), enable_create_file_fail(false), enable_list_dir_fail(false)
: block_filesystem(),
enable_create_file_fail(false),
enable_list_dir_fail(false),
enable_remote_path_fail(false)
{
}
virtual error_code initialize(const std::vector<std::string> &args) { return ERR_OK; }
Expand Down Expand Up @@ -201,6 +204,25 @@ class block_service_mock : public block_filesystem
const remove_path_callback &cb,
dsn::task_tracker *tracker)
{
remove_path_response resp;
if (enable_remote_path_fail) {
resp.err = ERR_MOCK_INTERNAL;
} else {
resp.err = ERR_OK;
std::string path_name = req.path;
if (dir_files.find(path_name) == dir_files.end()) {
resp.err = ERR_OBJECT_NOT_FOUND;
} else {
std::vector<ls_entry> files = dir_files[path_name];
if (!files.empty() && !req.recursive) {
resp.err = ERR_DIR_NOT_EMPTY;
} else {
dir_files.erase(path_name);
resp.err = ERR_OK;
}
}
}
cb(resp);
return task_ptr();
}

Expand All @@ -209,6 +231,7 @@ class block_service_mock : public block_filesystem
std::map<std::string, std::pair<int64_t, std::string>> files;
bool enable_create_file_fail;
bool enable_list_dir_fail;
bool enable_remote_path_fail;
};

} // namespace block_service
Expand Down

0 comments on commit 22d48c3

Please sign in to comment.