Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): bulk load download part3 - replica parse metadata, verify files and update progress #475

Merged
merged 9 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/dsn/utility/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ error_code md5sum(const std::string &file_path, /*out*/ std::string &result);
// B is represent wheter the directory is empty, true means empty, otherwise false
std::pair<error_code, bool> is_directory_empty(const std::string &dirname);

error_code read_file(const std::string &fname, /*out*/ std::string &buf);

} // namespace filesystem
} // namespace utils
} // namespace dsn
33 changes: 33 additions & 0 deletions src/core/core/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <fstream>

#include <dsn/c/api_utilities.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/safe_strerror_posix.h>
Expand Down Expand Up @@ -763,6 +766,36 @@ std::pair<error_code, bool> is_directory_empty(const std::string &dirname)
}
return res;
}

error_code read_file(const std::string &fname, std::string &buf)
{
if (!file_exists(fname)) {
derror_f("file({}) doesn't exist", fname);
return ERR_FILE_OPERATION_FAILED;
}

int64_t file_sz = 0;
if (!file_size(fname, file_sz)) {
derror_f("get file({}) size failed", fname);
return ERR_FILE_OPERATION_FAILED;
}

buf.resize(file_sz);
std::ifstream fin(fname, std::ifstream::in);
if (!fin.is_open()) {
derror_f("open file({}) failed", fname);
return ERR_FILE_OPERATION_FAILED;
}
fin.read(&buf[0], file_sz);
dassert_f(file_sz == fin.gcount(),
"read file({}) failed, file_size = {} but read size = {}",
fname,
file_sz,
fin.gcount());
fin.close();
return ERR_OK;
}

} // namespace filesystem
} // namespace utils
} // namespace dsn
1 change: 1 addition & 0 deletions src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5;
const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata");
const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR(".bulk_load");
const int32_t bulk_load_constant::PROGRESS_FINISHED = 100;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved

namespace cold_backup {
std::string get_policy_path(const std::string &root, const std::string &policy_name)
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class bulk_load_constant
static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL;
static const std::string BULK_LOAD_METADATA;
static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
// TODO(heyuchen): add more constant in further pr
static const int32_t PROGRESS_FINISHED;
};

namespace cold_backup {
Expand Down
99 changes: 86 additions & 13 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

#include "replica_bulk_loader.h"

#include <fstream>

#include <dsn/dist/block_service.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -275,7 +273,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
// parse metadata
const std::string &local_metadata_file_name =
utils::filesystem::path_combine(local_dir, bulk_load_constant::BULK_LOAD_METADATA);
err = parse_bulk_load_metadata(local_metadata_file_name, _metadata);
err = parse_bulk_load_metadata(local_metadata_file_name);
if (err != ERR_OK) {
derror_replica("parse bulk load metadata failed, error = {}", err.to_string());
return err;
Expand All @@ -288,7 +286,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
uint64_t f_size = 0;
error_code ec =
_replica->do_download(remote_dir, local_dir, f_meta.name, fs, f_size);
if (ec == ERR_OK && !verify_sst_files(f_meta, local_dir)) {
if (ec == ERR_OK && !verify_file(f_meta, local_dir)) {
ec = ERR_CORRUPTION;
}
if (ec != ERR_OK) {
Expand All @@ -309,28 +307,82 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname,
/*out*/ bulk_load_metadata &meta)
error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname)
{
// TODO(heyuchen): TBD
// read file and parse file content as bulk_load_metadata
std::string buf;
error_code ec = utils::filesystem::read_file(fname, buf);
if (ec != ERR_OK) {
derror_replica("read file {} failed, error = {}", fname, ec);
return ec;
}

blob bb = blob::create_from_bytes(std::move(buf));
if (!json::json_forwarder<bulk_load_metadata>::decode(bb, _metadata)) {
derror_replica("file({}) is damaged", fname);
return ERR_CORRUPTION;
}

if (_metadata.file_total_size <= 0) {
derror_replica("bulk_load_metadata has invalid file_total_size({})",
_metadata.file_total_size);
return ERR_CORRUPTION;
}

return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
bool replica_bulk_loader::verify_sst_files(const file_meta &f_meta, const std::string &local_dir)
bool replica_bulk_loader::verify_file(const file_meta &f_meta, const std::string &local_dir)
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
{
// TODO(heyuchen): TBD
// compare sst file metadata calculated by file and parsed by metadata
const std::string local_file = utils::filesystem::path_combine(local_dir, f_meta.name);
int64_t f_size = 0;
if (!utils::filesystem::file_size(local_file, f_size)) {
derror_replica("verify file({}) failed, becaused failed to get file size", local_file);
return false;
}
std::string md5;
if (utils::filesystem::md5sum(local_file, md5) != ERR_OK) {
derror_replica("verify file({}) failed, becaused failed to get file md5", local_file);
return false;
}
if (f_size != f_meta.size || md5 != f_meta.md5) {
derror_replica(
"verify file({}) failed, because file damaged, size: {} VS {}, md5: {} VS {}",
local_file,
f_size,
f_meta.size,
md5,
f_meta.md5);
return false;
}
return true;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
void replica_bulk_loader::update_bulk_load_download_progress(uint64_t file_size,
const std::string &file_name)
{
// TODO(heyuchen): TBD
// update download progress after downloading sst files succeed
if (_metadata.file_total_size <= 0) {
derror_replica("bulk_load_metadata has invalid file_total_size({})",
_metadata.file_total_size);
return;
}

ddebug_replica("update progress after downloading file({})", file_name);
_cur_downloaded_size.fetch_add(file_size);
auto total_size = static_cast<double>(_metadata.file_total_size);
auto cur_downloaded_size = static_cast<double>(_cur_downloaded_size.load());
auto cur_progress = static_cast<int32_t>((cur_downloaded_size / total_size) * 100);
_download_progress.store(cur_progress);
ddebug_replica("total_size = {}, cur_downloaded_size = {}, progress = {}",
total_size,
cur_downloaded_size,
cur_progress);

tasking::enqueue(LPC_REPLICATION_COMMON,
tracker(),
std::bind(&replica_bulk_loader::check_download_finish, this),
get_gpid().thread_hash());
}

// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG
Expand All @@ -342,6 +394,27 @@ void replica_bulk_loader::try_decrease_bulk_load_download_count()
_stub->_bulk_load_downloading_count.load());
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::check_download_finish()
{
if (_download_progress.load() == bulk_load_constant::PROGRESS_FINISHED &&
_status == bulk_load_status::BLS_DOWNLOADING) {
ddebug_replica("download all files succeed");
_status = bulk_load_status::BLS_DOWNLOADED;
try_decrease_bulk_load_download_count();
cleanup_download_task();
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::cleanup_download_task()
{
for (auto &kv : _download_task) {
CLEANUP_TASK_ALWAYS(kv.second)
}
_download_task.clear();
}

void replica_bulk_loader::clear_bulk_load_states()
{
// TODO(heyuchen): TBD
Expand Down
13 changes: 11 additions & 2 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,19 @@ class replica_bulk_loader : replica_base

// \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed
// \return ERR_CORRUPTION: parse failed
error_code parse_bulk_load_metadata(const std::string &fname, /*out*/ bulk_load_metadata &meta);
error_code parse_bulk_load_metadata(const std::string &fname);

bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir);
// TODO(heyuchen): move this function into block service manager, also used by restore
// compare file metadata calculated by file and parsed by metadata
bool verify_file(const file_meta &f_meta, const std::string &local_dir);

// update download progress after downloading sst files succeed
void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name);

void try_decrease_bulk_load_download_count();
void check_download_finish();

void cleanup_download_task();
void clear_bulk_load_states();

void report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
Expand Down Expand Up @@ -102,6 +109,8 @@ class replica_bulk_loader : replica_base

bulk_load_status::type _status{bulk_load_status::BLS_INVALID};
bulk_load_metadata _metadata;
std::atomic<uint64_t> _cur_downloaded_size{0};
std::atomic<int32_t> _download_progress{0};
std::atomic<error_code> _download_status{ERR_OK};
// file_name -> downloading task
std::map<std::string, task_ptr> _download_task;
Expand Down
Loading