Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): bulk load download part3 - replica parse metadata, verify files and update progress #475

Merged
merged 9 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/dsn/utility/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ error_code md5sum(const std::string &file_path, /*out*/ std::string &result);
// B is represent wheter the directory is empty, true means empty, otherwise false
std::pair<error_code, bool> is_directory_empty(const std::string &dirname);

error_code read_file(const std::string &fname, /*out*/ std::string &buf);

} // namespace filesystem
} // namespace utils
} // namespace dsn
33 changes: 33 additions & 0 deletions src/core/core/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <fstream>

#include <dsn/c/api_utilities.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/safe_strerror_posix.h>
Expand Down Expand Up @@ -763,6 +766,36 @@ std::pair<error_code, bool> is_directory_empty(const std::string &dirname)
}
return res;
}

error_code read_file(const std::string &fname, std::string &buf)
{
if (!file_exists(fname)) {
derror_f("file({}) doesn't exist", fname);
return ERR_FILE_OPERATION_FAILED;
}

int64_t file_sz = 0;
if (!utils::filesystem::file_size(fname, file_sz)) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
derror_f("get file({}) size failed", fname);
return ERR_FILE_OPERATION_FAILED;
}

buf.resize(file_sz);
std::ifstream fin(fname, std::ifstream::in);
if (!fin.is_open()) {
derror_f("open file({}) failed", fname);
return ERR_FILE_OPERATION_FAILED;
}
fin.read(&buf[0], file_sz);
dassert_f(file_sz == fin.gcount(),
"read file({}) failed, file_size = {} but read size = {}",
fname,
file_sz,
fin.gcount());
fin.close();
return ERR_OK;
}

} // namespace filesystem
} // namespace utils
} // namespace dsn
1 change: 1 addition & 0 deletions src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5;
const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata");
const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR(".bulk_load");
const int32_t bulk_load_constant::PROGRESS_FINISHED = 100;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved

namespace cold_backup {
std::string get_policy_path(const std::string &root, const std::string &policy_name)
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class bulk_load_constant
static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL;
static const std::string BULK_LOAD_METADATA;
static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
// TODO(heyuchen): add more constant in further pr
static const int32_t PROGRESS_FINISHED;
};

namespace cold_backup {
Expand Down
99 changes: 86 additions & 13 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

#include "replica_bulk_loader.h"

#include <fstream>

#include <dsn/dist/block_service.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -275,7 +273,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
// parse metadata
const std::string &local_metadata_file_name =
utils::filesystem::path_combine(local_dir, bulk_load_constant::BULK_LOAD_METADATA);
err = parse_bulk_load_metadata(local_metadata_file_name, _metadata);
err = parse_bulk_load_metadata(local_metadata_file_name);
if (err != ERR_OK) {
derror_replica("parse bulk load metadata failed, error = {}", err.to_string());
return err;
Expand All @@ -288,7 +286,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
uint64_t f_size = 0;
error_code ec =
_replica->do_download(remote_dir, local_dir, f_meta.name, fs, f_size);
if (ec == ERR_OK && !verify_sst_files(f_meta, local_dir)) {
if (ec == ERR_OK && !verify_file(f_meta, local_dir)) {
ec = ERR_CORRUPTION;
}
if (ec != ERR_OK) {
Expand All @@ -309,28 +307,82 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname,
/*out*/ bulk_load_metadata &meta)
error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname)
{
// TODO(heyuchen): TBD
// read file and parse file content as bulk_load_metadata
std::string buf;
error_code ec = utils::filesystem::read_file(fname, buf);
if (ec != ERR_OK) {
derror_replica("read file {} failed, error = {}", fname, ec);
return ec;
}

blob bb = blob::create_from_bytes(std::move(buf));
if (!json::json_forwarder<bulk_load_metadata>::decode(bb, _metadata)) {
derror_replica("file({}) is damaged", fname);
return ERR_CORRUPTION;
}

if (_metadata.file_total_size <= 0) {
derror_replica("bulk_load_metadata has invalid file_total_size({})",
_metadata.file_total_size);
return ERR_CORRUPTION;
}

return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
bool replica_bulk_loader::verify_sst_files(const file_meta &f_meta, const std::string &local_dir)
bool replica_bulk_loader::verify_file(const file_meta &f_meta, const std::string &local_dir)
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
{
// TODO(heyuchen): TBD
// compare sst file metadata calculated by file and parsed by metadata
const std::string local_file = utils::filesystem::path_combine(local_dir, f_meta.name);
int64_t f_size = 0;
if (!utils::filesystem::file_size(local_file, f_size)) {
derror_replica("verify file({}) failed, becaused failed to get file size", local_file);
return false;
}
std::string md5;
if (utils::filesystem::md5sum(local_file, md5) != ERR_OK) {
derror_replica("verify file({}) failed, becaused failed to get file md5", local_file);
return false;
}
if (f_size != f_meta.size || md5 != f_meta.md5) {
derror_replica(
"verify file({}) failed, because file damaged, size: {} VS {}, md5: {} VS {}",
local_file,
f_size,
f_meta.size,
md5,
f_meta.md5);
return false;
}
return true;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
void replica_bulk_loader::update_bulk_load_download_progress(uint64_t file_size,
const std::string &file_name)
{
// TODO(heyuchen): TBD
// update download progress after downloading sst files succeed
if (_metadata.file_total_size <= 0) {
derror_replica("bulk_load_metadata has invalid file_total_size({})",
_metadata.file_total_size);
return;
}

ddebug_replica("update progress after downloading file({})", file_name);
_cur_downloaded_size.fetch_add(file_size);
auto total_size = static_cast<double>(_metadata.file_total_size);
auto cur_downloaded_size = static_cast<double>(_cur_downloaded_size.load());
auto cur_progress = static_cast<int32_t>((cur_downloaded_size / total_size) * 100);
_download_progress.store(cur_progress);
ddebug_replica("total_size = {}, cur_downloaded_size = {}, progress = {}",
total_size,
cur_downloaded_size,
cur_progress);

tasking::enqueue(LPC_REPLICATION_COMMON,
tracker(),
std::bind(&replica_bulk_loader::check_download_finish, this),
get_gpid().thread_hash());
}

// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG
Expand All @@ -342,6 +394,27 @@ void replica_bulk_loader::try_decrease_bulk_load_download_count()
_stub->_bulk_load_downloading_count.load());
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::check_download_finish()
{
if (_download_progress.load() == bulk_load_constant::PROGRESS_FINISHED &&
_status == bulk_load_status::BLS_DOWNLOADING) {
ddebug_replica("download all files succeed");
_status = bulk_load_status::BLS_DOWNLOADED;
try_decrease_bulk_load_download_count();
cleanup_download_task();
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::cleanup_download_task()
{
for (auto &kv : _download_task) {
CLEANUP_TASK_ALWAYS(kv.second)
}
_download_task.clear();
}

void replica_bulk_loader::clear_bulk_load_states()
{
// TODO(heyuchen): TBD
Expand Down
13 changes: 11 additions & 2 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,19 @@ class replica_bulk_loader : replica_base

// \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed
// \return ERR_CORRUPTION: parse failed
error_code parse_bulk_load_metadata(const std::string &fname, /*out*/ bulk_load_metadata &meta);
error_code parse_bulk_load_metadata(const std::string &fname);

bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir);
// TODO(heyuchen): move this function into block service manager, also used by restore
// compare file metadata calculated by file and parsed by metadata
bool verify_file(const file_meta &f_meta, const std::string &local_dir);

// update download progress after downloading sst files succeed
void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name);

void try_decrease_bulk_load_download_count();
void check_download_finish();

void cleanup_download_task();
void clear_bulk_load_states();

void report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
Expand Down Expand Up @@ -102,6 +109,8 @@ class replica_bulk_loader : replica_base

bulk_load_status::type _status{bulk_load_status::BLS_INVALID};
bulk_load_metadata _metadata;
std::atomic<uint64_t> _cur_downloaded_size{0};
std::atomic<int32_t> _download_progress{0};
std::atomic<error_code> _download_status{ERR_OK};
// file_name -> downloading task
std::map<std::string, task_ptr> _download_task;
Expand Down
Loading