Skip to content

Commit

Permalink
refactor(backup): straighten the execution path of download_file (#631)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Oct 22, 2020
1 parent af60c5d commit 3679a2d
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 121 deletions.
175 changes: 59 additions & 116 deletions src/block_service/block_service_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,133 +84,76 @@ block_filesystem *block_service_manager::get_block_filesystem(const std::string
}
}

static create_file_response create_block_file_sync(const std::string &remote_file_path,
bool ignore_meta,
block_filesystem *fs,
task_tracker *tracker)
{
create_file_response ret;
fs->create_file(create_file_request{remote_file_path, ignore_meta},
TASK_CODE_EXEC_INLINED,
[&ret](const create_file_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}

static download_response
download_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
{
download_response ret;
bf->download(download_request{local_file_path, 0, -1},
TASK_CODE_EXEC_INLINED,
[&ret](const download_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}

// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG
error_code block_service_manager::download_file(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
block_filesystem *fs,
/*out*/ uint64_t &download_file_size)
{
error_code download_err = ERR_OK;
task_tracker tracker;

auto download_file_callback_func = [&download_err, &download_file_size](
const download_response &resp, block_file_ptr bf, const std::string &local_file_name) {
if (resp.err != ERR_OK) {
// during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
// error, however, if file damaged on remote file provider, bulk load should stop,
// return ERR_CORRUPTION instead
if (resp.err == ERR_OBJECT_NOT_FOUND) {
derror_f("download file({}) failed, file on remote file provider is damaged",
local_file_name);
download_err = ERR_CORRUPTION;
} else {
download_err = resp.err;
}
return;
}

if (resp.downloaded_size != bf->get_size()) {
derror_f(
"size not match while downloading file({}), file_size({}) vs downloaded_size({})",
bf->file_name(),
bf->get_size(),
resp.downloaded_size);
download_err = ERR_CORRUPTION;
return;
}

std::string current_md5;
error_code e = utils::filesystem::md5sum(local_file_name, current_md5);
if (e != ERR_OK) {
derror_f("calculate file({}) md5 failed", local_file_name);
download_err = e;
return;
}
if (current_md5 != bf->get_md5sum()) {
derror_f("local file({}) is different from remote file({}), download failed, md5: "
"local({}) VS remote({})",
local_file_name,
bf->file_name(),
current_md5,
bf->get_md5sum());
download_err = ERR_CORRUPTION;
return;
}
ddebug_f("download file({}) succeed, file_size = {}",
local_file_name.c_str(),
resp.downloaded_size);
download_err = ERR_OK;
download_file_size = resp.downloaded_size;
};

auto create_file_cb = [&local_dir,
&download_err,
&download_file_size,
&download_file_callback_func,
&tracker](const create_file_response &resp, const std::string &fname) {
if (resp.err != ERR_OK) {
derror_f("create file({}) failed with error({})", fname, resp.err.to_string());
download_err = resp.err;
return;
}
// local file exists
const std::string local_file_name = utils::filesystem::path_combine(local_dir, file_name);
if (utils::filesystem::file_exists(local_file_name)) {
ddebug_f("local file({}) has been downloaded", local_file_name);
return ERR_OK;
}

block_file *bf = resp.file_handle.get();
if (bf->get_md5sum().empty()) {
derror_f("file({}) doesn't exist on remote file provider", bf->file_name());
download_err = ERR_CORRUPTION;
return;
}
task_tracker tracker;

const std::string &local_file_name = utils::filesystem::path_combine(local_dir, fname);
// local file exists
if (utils::filesystem::file_exists(local_file_name)) {
std::string current_md5;
error_code e = utils::filesystem::md5sum(local_file_name, current_md5);
if (e != ERR_OK || current_md5 != bf->get_md5sum()) {
if (e != ERR_OK) {
dwarn_f("calculate file({}) md5 failed, should remove and redownload it",
local_file_name);
} else {
dwarn_f("local file({}) is different from remote file({}), md5: local({}) VS "
"remote({}), should remove and redownload it",
local_file_name,
bf->file_name(),
current_md5,
bf->get_md5sum());
}
if (!utils::filesystem::remove_path(local_file_name)) {
derror_f("failed to remove file({})", local_file_name);
download_err = e;
return;
}
} else {
download_err = ERR_OK;
download_file_size = bf->get_size();
ddebug_f("local file({}) has been downloaded, file size = {}",
local_file_name,
download_file_size);
return;
}
// Create a block_file object.
const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
auto create_resp =
create_block_file_sync(remote_file_name, false /*ignore file meta*/, fs, &tracker);
error_code err = create_resp.err;
if (err != ERR_OK) {
derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
return err;
}
block_file_ptr bf = create_resp.file_handle;

download_response resp = download_block_file_sync(local_file_name, bf.get(), &tracker);
if (resp.err != ERR_OK) {
// during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
// error, however, if file damaged on remote file provider, bulk load should stop,
// return ERR_CORRUPTION instead
if (resp.err == ERR_OBJECT_NOT_FOUND) {
derror_f("download file({}) failed, file on remote file provider is damaged",
local_file_name);
return ERR_CORRUPTION;
}
return resp.err;
}

// download or redownload file
bf->download(download_request{local_file_name, 0, -1},
TASK_CODE_EXEC_INLINED,
std::bind(download_file_callback_func,
std::placeholders::_1,
resp.file_handle,
local_file_name),
&tracker);
};

const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
fs->create_file(create_file_request{remote_file_name, false},
TASK_CODE_EXEC_INLINED,
std::bind(create_file_cb, std::placeholders::_1, file_name),
&tracker);
tracker.wait_outstanding_tasks();
return download_err;
ddebug_f(
"download file({}) succeed, file_size = {}", local_file_name.c_str(), resp.downloaded_size);
download_file_size = resp.downloaded_size;
return ERR_OK;
}

} // namespace block_service
Expand Down
5 changes: 5 additions & 0 deletions src/block_service/block_service_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class block_service_manager
// \return ERR_FS_INTERNAL: remote file system error
// \return ERR_CORRUPTION: file not exist or damaged
// if download file succeed, download_err = ERR_OK and set download_file_size
//
// TODO(wutao1): create block_filesystem_wrapper instead.
// NOTE: This function is not responsible for the correctness of the downloaded file.
// The file may be half-downloaded or corrupted due to disk failure.
// The users can compare checksums, and retry download if validation failed.
error_code download_file(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
Expand Down
6 changes: 3 additions & 3 deletions src/block_service/local/local_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,6 @@ dsn::task_ptr local_file_object::download(const download_request &req,

return tsk;
}
}
}
}
} // namespace block_service
} // namespace dist
} // namespace dsn
9 changes: 8 additions & 1 deletion src/block_service/test/block_service_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "block_service_mock.h"
#include "block_service/block_service_manager.h"
#include "block_service/local/local_service.h"

#include <fstream>

Expand Down Expand Up @@ -66,7 +67,13 @@ class block_service_manager_test : public ::testing::Test
// download_file unit tests
TEST_F(block_service_manager_test, do_download_remote_file_not_exist)
{
ASSERT_EQ(test_download_file(), ERR_CORRUPTION);
utils::filesystem::remove_path(LOCAL_DIR);
auto fs = make_unique<local_service>();
fs->initialize({LOCAL_DIR});
uint64_t download_size = 0;
error_code err = _block_service_manager.download_file(
PROVIDER, LOCAL_DIR, FILE_NAME, fs.get(), download_size);
ASSERT_EQ(err, ERR_CORRUPTION); // file does not exist
}

TEST_F(block_service_manager_test, do_download_redownload_file)
Expand Down
2 changes: 1 addition & 1 deletion src/block_service/test/config-test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type = replica
run = true
count = 1
ports = 54321
pools = THREAD_POOL_DEFAULT
pools = THREAD_POOL_DEFAULT,THREAD_POOL_LOCAL_SERVICE

[core]
tool = nativerun
Expand Down

0 comments on commit 3679a2d

Please sign in to comment.