Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(bulk-load): bulk load download part2 - replica download files (#471
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hycdong authored May 25, 2020
1 parent 5aabc57 commit d5c97d4
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_test;
friend class replica_backup_manager;
friend class replica_bulk_loader;
friend class replica_file_provider_test;

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down
130 changes: 126 additions & 4 deletions src/dist/replication/lib/replica_file_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,132 @@ error_code replica::do_download(const std::string &remote_dir,
dist::block_service::block_filesystem *fs,
/*out*/ uint64_t &download_file_size)
{
// TODO(heyuchen): TBD
// download files from remote provider
// this function can also be used in restore
return ERR_OK;
error_code download_err = ERR_OK;
task_tracker tracker;

auto download_file_callback_func = [this, &download_err, &download_file_size](
const dist::block_service::download_response &resp,
dist::block_service::block_file_ptr bf,
const std::string &local_file_name) {
if (resp.err != ERR_OK) {
// during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
// error, however, if file damaged on remote file provider, bulk load should stop,
// return ERR_CORRUPTION instead
if (resp.err == ERR_OBJECT_NOT_FOUND) {
derror_replica("download file({}) failed, file on remote file provider is damaged",
local_file_name);
download_err = ERR_CORRUPTION;
} else {
download_err = resp.err;
}
return;
}

if (resp.downloaded_size != bf->get_size()) {
derror_replica(
"size not match while downloading file({}), file_size({}) vs downloaded_size({})",
bf->file_name(),
bf->get_size(),
resp.downloaded_size);
download_err = ERR_CORRUPTION;
return;
}

std::string current_md5;
error_code e = utils::filesystem::md5sum(local_file_name, current_md5);
if (e != ERR_OK) {
derror_replica("calculate file({}) md5 failed", local_file_name);
download_err = e;
return;
}
if (current_md5 != bf->get_md5sum()) {
derror_replica(
"local file({}) is different from remote file({}), download failed, md5: "
"local({}) VS remote({})",
local_file_name,
bf->file_name(),
current_md5,
bf->get_md5sum());
download_err = ERR_CORRUPTION;
return;
}
ddebug_replica("download file({}) succeed, file_size = {}",
local_file_name.c_str(),
resp.downloaded_size);
download_err = ERR_OK;
download_file_size = resp.downloaded_size;
};

auto create_file_cb = [this,
&local_dir,
&download_err,
&download_file_size,
&download_file_callback_func,
&tracker](const dist::block_service::create_file_response &resp,
const std::string &fname) {
if (resp.err != ERR_OK) {
derror_replica("create file({}) failed with error({})", fname, resp.err.to_string());
download_err = resp.err;
return;
}

dist::block_service::block_file *bf = resp.file_handle.get();
if (bf->get_md5sum().empty()) {
derror_replica("file({}) doesn't exist on remote file provider", bf->file_name());
download_err = ERR_CORRUPTION;
return;
}

const std::string &local_file_name = utils::filesystem::path_combine(local_dir, fname);
// local file exists
if (utils::filesystem::file_exists(local_file_name)) {
std::string current_md5;
error_code e = utils::filesystem::md5sum(local_file_name, current_md5);
if (e != ERR_OK || current_md5 != bf->get_md5sum()) {
if (e != ERR_OK) {
dwarn_replica("calculate file({}) md5 failed, should remove and redownload it",
local_file_name);
} else {
dwarn_replica(
"local file({}) is different from remote file({}), md5: local({}) VS "
"remote({}), should remove and redownload it",
local_file_name,
bf->file_name(),
current_md5,
bf->get_md5sum());
}
if (!utils::filesystem::remove_path(local_file_name)) {
derror_replica("failed to remove file({})", local_file_name);
download_err = e;
return;
}
} else {
download_err = ERR_OK;
download_file_size = bf->get_size();
ddebug_replica("local file({}) has been downloaded, file size = {}",
local_file_name,
download_file_size);
return;
}
}

// download or redownload file
bf->download(dist::block_service::download_request{local_file_name, 0, -1},
TASK_CODE_EXEC_INLINED,
std::bind(download_file_callback_func,
std::placeholders::_1,
resp.file_handle,
local_file_name),
&tracker);
};

const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
fs->create_file(dist::block_service::create_file_request{remote_file_name, false},
TASK_CODE_EXEC_INLINED,
std::bind(create_file_cb, std::placeholders::_1, file_name),
&tracker);
tracker.wait_outstanding_tasks();
return download_err;
}

} // namespace replication
Expand Down
8 changes: 8 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#pragma once

#include "block_service_mock.h"

#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/replication/mutation_duplicator.h>
#include <dsn/dist/fmt_logging.h>
Expand Down Expand Up @@ -244,6 +246,12 @@ class mock_replica_stub : public replica_stub
{
_bulk_load_downloading_count.store(count);
}

std::unique_ptr<block_service_mock> get_block_filesystem()
{
std::unique_ptr<block_service_mock> block_service = make_unique<block_service_mock>();
return block_service;
}
};

class mock_log_file : public log_file
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "replica_test_base.h"

#include <fstream>

#include <gtest/gtest.h>

namespace dsn {
namespace replication {

class replica_file_provider_test : public replica_test_base
{
public:
replica_file_provider_test()
{
_replica = create_mock_replica(stub.get());
_fs = stub->get_block_filesystem();
utils::filesystem::create_directory(LOCAL_DIR);
}

~replica_file_provider_test() { utils::filesystem::remove_path(LOCAL_DIR); }

public:
error_code test_do_download()
{
uint64_t download_size = 0;
return _replica->do_download(PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get(), download_size);
}

void create_local_file(const std::string &file_name)
{
std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
utils::filesystem::create_file(whole_name);
std::ofstream test_file;
test_file.open(whole_name);
test_file << "write some data.\n";
test_file.close();

_file_meta.name = whole_name;
utils::filesystem::md5sum(whole_name, _file_meta.md5);
utils::filesystem::file_size(whole_name, _file_meta.size);
}

void create_remote_file(const std::string &file_name, int64_t size, const std::string &md5)
{
std::string whole_file_name = utils::filesystem::path_combine(PROVIDER, file_name);
_fs->files[whole_file_name] = std::make_pair(size, md5);
}

public:
std::unique_ptr<mock_replica> _replica;
std::unique_ptr<block_service_mock> _fs;

file_meta _file_meta;
std::string PROVIDER = "local_service";
std::string LOCAL_DIR = "test_dir";
std::string FILE_NAME = "test_file";
};

// do_download unit tests
TEST_F(replica_file_provider_test, do_download_remote_file_not_exist)
{
ASSERT_EQ(test_do_download(), ERR_CORRUPTION);
}

TEST_F(replica_file_provider_test, do_download_redownload_file)
{
// local file exists, but md5 not matched with remote file
// expected to remove old local file and redownload it
create_local_file(FILE_NAME);
create_remote_file(FILE_NAME, 2333, "md5_not_match");
ASSERT_EQ(test_do_download(), ERR_OK);
}

TEST_F(replica_file_provider_test, do_download_file_exist)
{
create_local_file(FILE_NAME);
create_remote_file(FILE_NAME, _file_meta.size, _file_meta.md5);
ASSERT_EQ(test_do_download(), ERR_OK);
}

TEST_F(replica_file_provider_test, do_download_succeed)
{
create_local_file(FILE_NAME);
create_remote_file(FILE_NAME, _file_meta.size, _file_meta.md5);
// remove local file to mock condition that file not existed
std::string file_name = utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME);
utils::filesystem::remove_path(file_name);
ASSERT_EQ(test_do_download(), ERR_OK);
}

} // namespace replication
} // namespace dsn

0 comments on commit d5c97d4

Please sign in to comment.