Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add some functions for block service manager #1109

Merged
merged 4 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 149 additions & 8 deletions src/rdsn/src/block_service/block_service_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ static create_file_response create_block_file_sync(const std::string &remote_fil
return ret;
}

error_code block_service_manager::create_block_file(const std::string &remote_file_name,
bool ignore_meta,
block_filesystem *fs,
task_tracker *tracker,
/*out*/ create_file_response &create_resp)
{
create_resp = create_block_file_sync(remote_file_name, ignore_meta, fs, tracker);
const auto &err = create_resp.err;
if (err != ERR_OK) {
derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
}
return err;
}

static download_response
download_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
{
Expand Down Expand Up @@ -150,19 +164,19 @@ error_code block_service_manager::download_file(const std::string &remote_dir,
}

task_tracker tracker;

// Create a block_file object.
const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
auto create_resp =
create_block_file_sync(remote_file_name, false /*ignore file meta*/, fs, &tracker);
error_code err = create_resp.err;
create_file_response create_resp;
auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name),
false /*ignore file meta*/,
fs,
&tracker,
create_resp);
if (err != ERR_OK) {
derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
return err;
}
block_file_ptr bf = create_resp.file_handle;

download_response resp = download_block_file_sync(local_file_name, bf.get(), &tracker);
download_response resp =
download_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker);
if (resp.err != ERR_OK) {
// during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
// error, however, if file damaged on remote file provider, bulk load should stop,
Expand All @@ -184,6 +198,133 @@ error_code block_service_manager::download_file(const std::string &remote_dir,
return ERR_OK;
}

static upload_response
upload_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
{
upload_response ret;
bf->upload(upload_request{local_file_path},
TASK_CODE_EXEC_INLINED,
[&ret](const upload_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}

error_code block_service_manager::upload_file(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
block_filesystem *fs)
{
task_tracker tracker;
// Create a block_file object.
create_file_response create_resp;
auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name),
false /*ignore file meta*/,
fs,
&tracker,
create_resp);
if (err != ERR_OK) {
return err;
}
// Upload file
const auto &local_file_name = utils::filesystem::path_combine(local_dir, file_name);
const upload_response &resp =
upload_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker);
if (resp.err != ERR_OK) {
return resp.err;
}
ddebug_f("upload file({}) succeed", local_file_name);
return ERR_OK;
}

static write_response
write_block_file_sync(const blob &value, block_file *bf, task_tracker *tracker)
{
write_response ret;
bf->write(write_request{value},
TASK_CODE_EXEC_INLINED,
[&ret](const write_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}

error_code block_service_manager::write_file(const std::string &remote_dir,
const std::string &file_name,
const blob &value,
block_filesystem *fs)
{
task_tracker tracker;
// Create a block_file object.
create_file_response create_resp;
const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
auto err =
create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp);
if (err != ERR_OK) {
return err;
}
// Write blob
const write_response &resp =
write_block_file_sync(value, create_resp.file_handle.get(), &tracker);
if (resp.err != ERR_OK) {
return resp.err;
}
ddebug_f("write remote file({}) succeed", remote_file_name);
return ERR_OK;
}

error_code
block_service_manager::remove_path(const std::string &path, bool recursive, block_filesystem *fs)
{
task_tracker tracker;
remove_path_response ret;
fs->remove_path(remove_path_request{path, recursive},
TASK_CODE_EXEC_INLINED,
[&ret](const remove_path_response &resp) { ret = resp; },
&tracker);
tracker.wait_outstanding_tasks();
return ret.err;
}

static read_response read_block_file_sync(block_file *bf,
const uint64_t remote_pos,
const int64_t remote_length,
task_tracker *tracker)
{
read_response ret;
bf->read(read_request{remote_pos, remote_length},
TASK_CODE_EXEC_INLINED,
[&ret](const read_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}

error_code block_service_manager::read_file(const std::string &remote_dir,
const std::string &file_name,
block_filesystem *fs,
blob &value)
{
task_tracker tracker;
// Create a block_file object.
create_file_response create_resp;
const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
auto err =
create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp);
if (err != ERR_OK) {
return err;
}
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
// Read blob
const read_response &resp = read_block_file_sync(
create_resp.file_handle.get(), 0 /*remote_pos*/, -1 /*remote_length*/, &tracker);
if (resp.err != ERR_OK) {
return resp.err;
}
ddebug_f("read remote file({}) succeed", remote_file_name);
value = resp.buffer;
return ERR_OK;
}

} // namespace block_service
} // namespace dist
} // namespace dsn
38 changes: 38 additions & 0 deletions src/rdsn/src/block_service/block_service_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ class block_service_manager
~block_service_manager();
block_filesystem *get_or_create_block_filesystem(const std::string &provider);

// create block file
// \return ERR_FS_INTERNAL: remote file system error
error_code create_block_file(const std::string &remote_file_name,
bool ignore_meta,
block_filesystem *fs,
task_tracker *tracker,
/*out*/ create_file_response &resp);

// download files from remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
Expand All @@ -69,6 +77,36 @@ class block_service_manager
block_filesystem *fs,
/*out*/ uint64_t &download_file_size);

// upload files from remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
error_code upload_file(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
block_filesystem *fs);

// write blob value onto remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
error_code write_file(const std::string &remote_dir,
const std::string &file_name,
const blob &value,
block_filesystem *fs);

// remove path on remote file system
// \return ERR_OBJECT_NOT_FOUND: remove path not exist
// \return ERR_FS_INTERNAL: remote file system error
// \return ERR_DIR_NOT_EMPTY: path not empty
error_code remove_path(const std::string &path, bool recursive, block_filesystem *fs);

// read blob value from remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
error_code read_file(const std::string &remote_dir,
const std::string &file_name,
block_filesystem *fs,
/*out*/ blob &value);

private:
block_service_registry &_registry_holder;

Expand Down
77 changes: 77 additions & 0 deletions src/rdsn/src/block_service/test/block_service_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ class block_service_manager_test : public ::testing::Test
PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get(), download_size);
}

error_code test_upload_file()
{
return _block_service_manager.upload_file(PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get());
}

error_code test_write_file()
{
return _block_service_manager.write_file(
PROVIDER, FILE_NAME, blob::create_from_bytes("test_value"), _fs.get());
}

error_code test_read_file(blob &value)
{
return _block_service_manager.read_file(PROVIDER, FILE_NAME, _fs.get(), value);
}

error_code test_remove_path(bool recursive)
{
return _block_service_manager.remove_path(REMOTE_DIR, recursive, _fs.get());
}

void create_local_file(const std::string &file_name)
{
std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
Expand All @@ -63,9 +84,28 @@ class block_service_manager_test : public ::testing::Test
void create_remote_file(const std::string &file_name, int64_t size, const std::string &md5)
{
std::string whole_file_name = utils::filesystem::path_combine(PROVIDER, file_name);

_fs->files[whole_file_name] = std::make_pair(size, md5);
}

void create_remote_dir(bool empty)
{
std::vector<ls_entry> entries;
if (!empty) {
ls_entry dir_entry;
dir_entry.entry_name = REMOTE_DIR;
dir_entry.is_directory = true;
entries.emplace_back(dir_entry);
ls_entry file_entry;
file_entry.entry_name = FILE_NAME;
file_entry.is_directory = false;
entries.emplace_back(file_entry);
}
_fs->dir_files[REMOTE_DIR] = entries;
}

void clear_remote_dir() { _fs->dir_files.clear(); }

public:
block_service_manager _block_service_manager;
std::unique_ptr<block_service_mock> _fs;
Expand All @@ -74,6 +114,7 @@ class block_service_manager_test : public ::testing::Test
std::string PROVIDER = "local_service";
std::string LOCAL_DIR = "test_dir";
std::string FILE_NAME = "test_file";
std::string REMOTE_DIR = "remote_test_dir";
};

// download_file unit tests
Expand Down Expand Up @@ -119,6 +160,42 @@ TEST_F(block_service_manager_test, do_download_succeed)
ASSERT_EQ(download_size, _file_meta.size);
}

TEST_F(block_service_manager_test, upload_file_test)
{
create_local_file(FILE_NAME);
ASSERT_EQ(test_upload_file(), ERR_OK);
}

TEST_F(block_service_manager_test, write_file_test) { ASSERT_EQ(test_write_file(), ERR_OK); }

TEST_F(block_service_manager_test, read_file_test)
{
blob value;
ASSERT_EQ(test_read_file(value), ERR_OK);
}

TEST_F(block_service_manager_test, remove_path_test)
{
struct test_struct
{
bool mock_dir;
bool dir_empty;
bool recursive;
error_code expected_err;
} tests[]{{false, false, false, ERR_OBJECT_NOT_FOUND},
{true, true, false, ERR_OK},
{true, true, true, ERR_OK},
{true, false, false, ERR_DIR_NOT_EMPTY},
{true, false, true, ERR_OK}};
for (const auto &test : tests) {
if (test.mock_dir) {
create_remote_dir(test.dir_empty);
}
ASSERT_EQ(test_remove_path(test.recursive), test.expected_err);
clear_remote_dir();
}
}

} // namespace block_service
} // namespace dist
} // namespace dsn
25 changes: 24 additions & 1 deletion src/rdsn/src/block_service/test/block_service_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ class block_service_mock : public block_filesystem
{
public:
block_service_mock()
: block_filesystem(), enable_create_file_fail(false), enable_list_dir_fail(false)
: block_filesystem(),
enable_create_file_fail(false),
enable_list_dir_fail(false),
enable_remote_path_fail(false)
{
}
virtual error_code initialize(const std::vector<std::string> &args) { return ERR_OK; }
Expand Down Expand Up @@ -201,6 +204,25 @@ class block_service_mock : public block_filesystem
const remove_path_callback &cb,
dsn::task_tracker *tracker)
{
remove_path_response resp;
if (enable_remote_path_fail) {
resp.err = ERR_MOCK_INTERNAL;
} else {
resp.err = ERR_OK;
std::string path_name = req.path;
if (dir_files.find(path_name) == dir_files.end()) {
resp.err = ERR_OBJECT_NOT_FOUND;
} else {
std::vector<ls_entry> files = dir_files[path_name];
if (!files.empty() && !req.recursive) {
resp.err = ERR_DIR_NOT_EMPTY;
} else {
dir_files.erase(path_name);
resp.err = ERR_OK;
}
}
}
cb(resp);
return task_ptr();
}

Expand All @@ -209,6 +231,7 @@ class block_service_mock : public block_filesystem
std::map<std::string, std::pair<int64_t, std::string>> files;
bool enable_create_file_fail;
bool enable_list_dir_fail;
bool enable_remote_path_fail;
};

} // namespace block_service
Expand Down