Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): bulk load download part3 - replica parse metadata, verify files and update progress #475

Merged
merged 9 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5;
const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata");
const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR(".bulk_load");
const int32_t bulk_load_constant::PROGRESS_FINISHED = 100;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved

namespace cold_backup {
std::string get_policy_path(const std::string &root, const std::string &policy_name)
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class bulk_load_constant
static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL;
static const std::string BULK_LOAD_METADATA;
static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
// TODO(heyuchen): add more constant in further pr
static const int32_t PROGRESS_FINISHED;
};

namespace cold_backup {
Expand Down
113 changes: 104 additions & 9 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
// parse metadata
const std::string &local_metadata_file_name =
utils::filesystem::path_combine(local_dir, bulk_load_constant::BULK_LOAD_METADATA);
err = parse_bulk_load_metadata(local_metadata_file_name, _metadata);
err = parse_bulk_load_metadata(local_metadata_file_name);
if (err != ERR_OK) {
derror_replica("parse bulk load metadata failed, error = {}", err.to_string());
return err;
Expand Down Expand Up @@ -309,28 +309,102 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname,
/*out*/ bulk_load_metadata &meta)
error_code replica_bulk_loader::parse_bulk_load_metadata(const std::string &fname)
{
// TODO(heyuchen): TBD
// read file and parse file content as bulk_load_metadata
if (!utils::filesystem::file_exists(fname)) {
derror_replica("file({}) doesn't exist", fname);
return ERR_FILE_OPERATION_FAILED;
}

int64_t file_sz = 0;
if (!utils::filesystem::file_size(fname, file_sz)) {
derror_replica("get file({}) size failed", fname);
return ERR_FILE_OPERATION_FAILED;
}

std::shared_ptr<char> buf = utils::make_shared_array<char>(file_sz + 1);
hycdong marked this conversation as resolved.
Show resolved Hide resolved
std::ifstream fin(fname, std::ifstream::in);
if (!fin.is_open()) {
derror_replica("open file({}) failed", fname);
return ERR_FILE_OPERATION_FAILED;
}
fin.read(buf.get(), file_sz);
dassert_replica(file_sz == fin.gcount(),
"read file({}) failed, file_size = {} but read size = {}",
fname,
file_sz,
fin.gcount());
fin.close();
buf.get()[fin.gcount()] = '\0';
neverchanje marked this conversation as resolved.
Show resolved Hide resolved

blob bb;
bb.assign(std::move(buf), 0, file_sz);
if (!json::json_forwarder<bulk_load_metadata>::decode(bb, _metadata)) {
derror_replica("file({}) is damaged", fname);
return ERR_CORRUPTION;
}

if (_metadata.file_total_size <= 0) {
derror_replica("bulk_load_metadata has invalid file_total_size({})",
_metadata.file_total_size);
return ERR_CORRUPTION;
}

return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
bool replica_bulk_loader::verify_sst_files(const file_meta &f_meta, const std::string &local_dir)
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
// TODO(heyuchen): TBD
// compare sst file metadata calculated by file and parsed by metadata
std::string local_file = utils::filesystem::path_combine(local_dir, f_meta.name);
hycdong marked this conversation as resolved.
Show resolved Hide resolved
int64_t f_size = 0;
std::string md5;
hycdong marked this conversation as resolved.
Show resolved Hide resolved
if (!utils::filesystem::file_size(local_file, f_size)) {
derror_replica("verify file({}) failed, becaused failed to get file size", local_file);
return false;
}
if (utils::filesystem::md5sum(local_file, md5) != ERR_OK) {
derror_replica("verify file({}) failed, becaused failed to get file md5", local_file);
return false;
}
if (f_size != f_meta.size || md5 != f_meta.md5) {
derror_replica(
"verify file({}) failed, because file damaged, size: {} VS {}, md5: {} VS {}",
local_file,
f_size,
f_meta.size,
md5,
f_meta.md5);
return false;
}
return true;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
void replica_bulk_loader::update_bulk_load_download_progress(uint64_t file_size,
const std::string &file_name)
{
// TODO(heyuchen): TBD
// update download progress after downloading sst files succeed
if (_metadata.file_total_size <= 0) {
derror_replica("bulk_load_metadata has invalid file_total_size({})",
_metadata.file_total_size);
return;
}

ddebug_replica("update progress after downloading file({})", file_name);
_cur_downloaded_size.fetch_add(file_size);
auto total_size = static_cast<double>(_metadata.file_total_size);
auto cur_downloaded_size = static_cast<double>(_cur_downloaded_size.load());
auto cur_progress = static_cast<int32_t>((cur_downloaded_size / total_size) * 100);
_download_progress.store(cur_progress);
ddebug_replica("total_size = {}, cur_downloaded_size = {}, progress = {}",
total_size,
cur_downloaded_size,
cur_progress);

tasking::enqueue(LPC_REPLICATION_COMMON,
tracker(),
std::bind(&replica_bulk_loader::bulk_load_check_download_finish, this),
get_gpid().thread_hash());
}

// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG
Expand All @@ -342,6 +416,27 @@ void replica_bulk_loader::try_decrease_bulk_load_download_count()
_stub->_bulk_load_downloading_count.load());
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::bulk_load_check_download_finish()
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
if (_download_progress.load() == bulk_load_constant::PROGRESS_FINISHED &&
_status == bulk_load_status::BLS_DOWNLOADING) {
ddebug_replica("download all files succeed");
_status = bulk_load_status::BLS_DOWNLOADED;
try_decrease_bulk_load_download_count();
cleanup_download_task();
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::cleanup_download_task()
{
for (auto &kv : _download_task) {
CLEANUP_TASK_ALWAYS(kv.second)
}
_download_task.clear();
}

void replica_bulk_loader::clear_bulk_load_states()
{
// TODO(heyuchen): TBD
Expand Down
10 changes: 9 additions & 1 deletion src/dist/replication/lib/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ class replica_bulk_loader : replica_base

// \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed
// \return ERR_CORRUPTION: parse failed
error_code parse_bulk_load_metadata(const std::string &fname, /*out*/ bulk_load_metadata &meta);
error_code parse_bulk_load_metadata(const std::string &fname);

// compare sst file metadata calculated by file and parsed by metadata
bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir);

// update download progress after downloading sst files succeed
void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name);

void try_decrease_bulk_load_download_count();
void bulk_load_check_download_finish();

void cleanup_download_task();
void clear_bulk_load_states();

void report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
Expand Down Expand Up @@ -102,6 +108,8 @@ class replica_bulk_loader : replica_base

bulk_load_status::type _status{bulk_load_status::BLS_INVALID};
bulk_load_metadata _metadata;
std::atomic<uint64_t> _cur_downloaded_size{0};
std::atomic<int32_t> _download_progress{0};
std::atomic<error_code> _download_status{ERR_OK};
// file_name -> downloading task
std::map<std::string, task_ptr> _download_task;
Expand Down
131 changes: 131 additions & 0 deletions src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ class replica_bulk_loader_test : public replica_test_base
return _bulk_loader->bulk_load_start_download(APP_NAME, CLUSTER, PROVIDER);
}

error_code test_parse_bulk_load_metadata(const std::string &file_path)
{
return _bulk_loader->parse_bulk_load_metadata(file_path);
}

bool test_verify_sst_files(int64_t size, const std::string &md5)
{
file_meta f_meta;
f_meta.name = FILE_NAME;
f_meta.size = size;
f_meta.md5 = md5;
return _bulk_loader->verify_sst_files(f_meta, LOCAL_DIR);
}

/// mock structure functions

void
Expand Down Expand Up @@ -101,15 +115,82 @@ class replica_bulk_loader_test : public replica_test_base
config.secondaries.emplace_back(SECONDARY2);
}

void create_local_file(const std::string &file_name)
{
std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
utils::filesystem::create_file(whole_name);
std::ofstream test_file;
test_file.open(whole_name);
test_file << "write some data.\n";
test_file.close();

_file_meta.name = whole_name;
utils::filesystem::md5sum(whole_name, _file_meta.md5);
utils::filesystem::file_size(whole_name, _file_meta.size);
}

error_code create_local_metadata_file()
{
create_local_file(FILE_NAME);
_metadata.files.emplace_back(_file_meta);
_metadata.file_total_size = _file_meta.size;

std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
utils::filesystem::create_file(whole_name);
std::ofstream os(whole_name.c_str(),
(std::ofstream::out | std::ios::binary | std::ofstream::trunc));
if (!os.is_open()) {
derror("open file %s failed", whole_name.c_str());
return ERR_FILE_OPERATION_FAILED;
}

blob bb = json::json_forwarder<bulk_load_metadata>::encode(_metadata);
os.write((const char *)bb.data(), (std::streamsize)bb.length());
if (os.bad()) {
derror("write file %s failed", whole_name.c_str());
return ERR_FILE_OPERATION_FAILED;
}
os.close();

return ERR_OK;
}

bool validate_metadata()
{
auto target = _bulk_loader->_metadata;
if (target.file_total_size != _metadata.file_total_size) {
return false;
}
if (target.files.size() != _metadata.files.size()) {
return false;
}
for (int i = 0; i < target.files.size(); ++i) {
if (target.files[i].name != _metadata.files[i].name) {
return false;
}
if (target.files[i].size != _metadata.files[i].size) {
return false;
}
if (target.files[i].md5 != _metadata.files[i].md5) {
return false;
}
}
return true;
}

// helper functions
bulk_load_status::type get_bulk_load_status() const { return _bulk_loader->_status; }

public:
std::unique_ptr<mock_replica> _replica;
std::unique_ptr<replica_bulk_loader> _bulk_loader;

bulk_load_request _req;
group_bulk_load_request _group_req;

file_meta _file_meta;
bulk_load_metadata _metadata;

std::string APP_NAME = "replica";
std::string CLUSTER = "cluster";
std::string PROVIDER = "local_service";
Expand All @@ -119,6 +200,9 @@ class replica_bulk_loader_test : public replica_test_base
rpc_address SECONDARY = rpc_address("127.0.0.3", 34801);
rpc_address SECONDARY2 = rpc_address("127.0.0.4", 34801);
int32_t MAX_DOWNLOADING_COUNT = 5;
std::string LOCAL_DIR = bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR;
std::string METADATA = bulk_load_constant::BULK_LOAD_METADATA;
std::string FILE_NAME = "test_sst_file";
};

// on_bulk_load unit tests
Expand Down Expand Up @@ -196,5 +280,52 @@ TEST_F(replica_bulk_loader_test, start_downloading_test)
}
}

// parse_bulk_load_metadata unit tests
TEST_F(replica_bulk_loader_test, bulk_load_metadata_not_exist)
{
ASSERT_EQ(test_parse_bulk_load_metadata("path_not_exist"), ERR_FILE_OPERATION_FAILED);
}

TEST_F(replica_bulk_loader_test, bulk_load_metadata_corrupt)
{
// create file can not parse as bulk_load_metadata structure
utils::filesystem::create_directory(LOCAL_DIR);
create_local_file(METADATA);
std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
error_code ec = test_parse_bulk_load_metadata(metadata_file_name);
ASSERT_EQ(ec, ERR_CORRUPTION);
utils::filesystem::remove_path(LOCAL_DIR);
}

TEST_F(replica_bulk_loader_test, bulk_load_metadata_parse_succeed)
{
utils::filesystem::create_directory(LOCAL_DIR);
error_code ec = create_local_metadata_file();
ASSERT_EQ(ec, ERR_OK);

std::string metadata_file_name = utils::filesystem::path_combine(LOCAL_DIR, METADATA);
ec = test_parse_bulk_load_metadata(metadata_file_name);
ASSERT_EQ(ec, ERR_OK);
ASSERT_TRUE(validate_metadata());
utils::filesystem::remove_path(LOCAL_DIR);
}

// verify_sst_files unit tests
TEST_F(replica_bulk_loader_test, verify_file_failed)
{
utils::filesystem::create_directory(LOCAL_DIR);
create_local_file(FILE_NAME);
ASSERT_FALSE(test_verify_sst_files(_file_meta.size, "wrong_md5"));
utils::filesystem::remove_path(LOCAL_DIR);
}

TEST_F(replica_bulk_loader_test, verify_file_succeed)
{
utils::filesystem::create_directory(LOCAL_DIR);
create_local_file(FILE_NAME);
ASSERT_TRUE(test_verify_sst_files(_file_meta.size, _file_meta.md5));
utils::filesystem::remove_path(LOCAL_DIR);
}

} // namespace replication
} // namespace dsn