Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): bulk load download part2 - replica download files #471

Merged
merged 9 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_test;
friend class replica_backup_manager;
friend class replica_bulk_loader;
friend class replica_file_provider_test;

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down
130 changes: 126 additions & 4 deletions src/dist/replication/lib/replica_file_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,132 @@ error_code replica::do_download(const std::string &remote_dir,
dist::block_service::block_filesystem *fs,
/*out*/ uint64_t &download_file_size)
{
// TODO(heyuchen): TBD
// download files from remote provider
// this function can also be used in restore
return ERR_OK;
error_code download_err = ERR_OK;
Copy link
Contributor

@neverchanje neverchanje May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found nothing in your codes relates to the replica except for ddebug_replica. It should be moved to dist::block_service module, rather than be in replica.

struct download_options {
  std::string &remote_dir;
  std::string &local_dir;
  // TODO(): add download rate limiter here.
  // uint64_t max_download_size;
};

error_s download_remote_files(const download_options& options, dist::block_service::block_filesystem *fs, /*out*/ uint64_t &download_file_size) {
  return error_s::make();
}

Under this impl you cannt print logs in do_download, however, you can return an error with string (error_s) in failure.

error_code replica::do_download(const std::string &remote_dir,
                                const std::string &local_dir,
                                const std::string &file_name,
                                dist::block_service::block_filesystem *fs,
                                /*out*/ uint64_t &download_file_size) {
  error_s err = download_remote_files();
  if (!err.is_ok())
    derror_replica(err);
  } else {
    ddebug_replica("download succeed with total file size {}", download_file_size);
  }
}

Copy link
Contributor Author

@hycdong hycdong May 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think current implementation is during a middle stage, it is okay to leave do_download function in replica namespace.
If I move download function into block_service module, I should also move unit tests into block_service module. However, cold backup has wrote a block_file_mock in replica unit test for backup unit tests. If I move it into block_service, I should update backup unit tests code, which will be refactored in further pull request finally, it is not meaningful to refactor them currently.
In my view, we should move all functions related to file provider into block_service module, such as download files, upload files. It can be implemented after refactoring backup and restore code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discuss offline, I will refactor code in next pull request.

task_tracker tracker;

auto download_file_callback_func = [this, &download_err, &download_file_size](
const dist::block_service::download_response &resp,
dist::block_service::block_file_ptr bf,
const std::string &local_file_name) {
if (resp.err != ERR_OK) {
// during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
// error, however, if file damaged on remote file provider, bulk load should stop,
// return ERR_CORRUPTION instead
if (resp.err == ERR_OBJECT_NOT_FOUND) {
derror_replica("download file({}) failed, file on remote file provider is damaged",
local_file_name);
download_err = ERR_CORRUPTION;
} else {
download_err = resp.err;
}
return;
}

if (resp.downloaded_size != bf->get_size()) {
derror_replica(
"size not match while downloading file({}), file_size({}) vs downloaded_size({})",
bf->file_name(),
bf->get_size(),
resp.downloaded_size);
download_err = ERR_CORRUPTION;
return;
}

std::string current_md5;
error_code e = utils::filesystem::md5sum(local_file_name, current_md5);
if (e != ERR_OK) {
derror_replica("calculate file({}) md5 failed", local_file_name);
download_err = e;
return;
}
if (current_md5 != bf->get_md5sum()) {
derror_replica(
"local file({}) is different from remote file({}), download failed, md5: "
"local({}) VS remote({})",
local_file_name,
bf->file_name(),
current_md5,
bf->get_md5sum());
download_err = ERR_CORRUPTION;
return;
}
ddebug_replica("download file({}) succeed, file_size = {}",
local_file_name.c_str(),
resp.downloaded_size);
download_err = ERR_OK;
download_file_size = resp.downloaded_size;
};

auto create_file_cb = [this,
&local_dir,
&download_err,
&download_file_size,
&download_file_callback_func,
&tracker](const dist::block_service::create_file_response &resp,
const std::string &fname) {
if (resp.err != ERR_OK) {
derror_replica("create file({}) failed with error({})", fname, resp.err.to_string());
download_err = resp.err;
return;
}

dist::block_service::block_file *bf = resp.file_handle.get();
if (bf->get_md5sum().empty()) {
derror_replica("file({}) doesn't exist on remote file provider", bf->file_name());
download_err = ERR_CORRUPTION;
return;
}

const std::string &local_file_name = utils::filesystem::path_combine(local_dir, fname);
// local file exists
if (utils::filesystem::file_exists(local_file_name)) {
std::string current_md5;
error_code e = utils::filesystem::md5sum(local_file_name, current_md5);
if (e != ERR_OK || current_md5 != bf->get_md5sum()) {
if (e != ERR_OK) {
dwarn_replica("calculate file({}) md5 failed, should remove and redownload it",
local_file_name);
} else {
dwarn_replica(
"local file({}) is different from remote file({}), md5: local({}) VS "
"remote({}), should remove and redownload it",
local_file_name,
bf->file_name(),
current_md5,
bf->get_md5sum());
}
if (!utils::filesystem::remove_path(local_file_name)) {
derror_replica("failed to remove file({})", local_file_name);
download_err = e;
return;
}
} else {
download_err = ERR_OK;
download_file_size = bf->get_size();
ddebug_replica("local file({}) has been downloaded, file size = {}",
local_file_name,
download_file_size);
return;
}
}

// download or redownload file
bf->download(dist::block_service::download_request{local_file_name, 0, -1},
TASK_CODE_EXEC_INLINED,
std::bind(download_file_callback_func,
std::placeholders::_1,
resp.file_handle,
local_file_name),
&tracker);
};

const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
fs->create_file(dist::block_service::create_file_request{remote_file_name, false},
TASK_CODE_EXEC_INLINED,
std::bind(create_file_cb, std::placeholders::_1, file_name),
&tracker);
tracker.wait_outstanding_tasks();
return download_err;
}

} // namespace replication
Expand Down
8 changes: 8 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#pragma once

#include "block_service_mock.h"

#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/replication/mutation_duplicator.h>
#include <dsn/dist/fmt_logging.h>
Expand Down Expand Up @@ -244,6 +246,12 @@ class mock_replica_stub : public replica_stub
{
_bulk_load_downloading_count.store(count);
}

std::unique_ptr<block_service_mock> get_block_filesystem()
{
std::unique_ptr<block_service_mock> block_service = make_unique<block_service_mock>();
return block_service;
}
};

class mock_log_file : public log_file
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "replica_test_base.h"

#include <fstream>

#include <gtest/gtest.h>

namespace dsn {
namespace replication {

class replica_file_provider_test : public replica_test_base
{
public:
replica_file_provider_test()
{
_replica = create_mock_replica(stub.get());
_fs = stub->get_block_filesystem();
utils::filesystem::create_directory(LOCAL_DIR);
}

~replica_file_provider_test() { utils::filesystem::remove_path(LOCAL_DIR); }

public:
error_code test_do_download()
{
uint64_t download_size = 0;
return _replica->do_download(PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get(), download_size);
}

void create_local_file(const std::string &file_name)
{
std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
utils::filesystem::create_file(whole_name);
std::ofstream test_file;
test_file.open(whole_name);
test_file << "write some data.\n";
test_file.close();

_file_meta.name = whole_name;
utils::filesystem::md5sum(whole_name, _file_meta.md5);
utils::filesystem::file_size(whole_name, _file_meta.size);
}

void create_remote_file(const std::string &file_name, int64_t size, const std::string &md5)
{
std::string whole_file_name = utils::filesystem::path_combine(PROVIDER, file_name);
_fs->files[whole_file_name] = std::make_pair(size, md5);
}

public:
std::unique_ptr<mock_replica> _replica;
std::unique_ptr<block_service_mock> _fs;

file_meta _file_meta;
std::string PROVIDER = "local_service";
std::string LOCAL_DIR = "test_dir";
std::string FILE_NAME = "test_file";
};

// do_download unit tests
TEST_F(replica_file_provider_test, do_download_remote_file_not_exist)
{
ASSERT_EQ(test_do_download(), ERR_CORRUPTION);
}

TEST_F(replica_file_provider_test, do_download_redownload_file)
{
// local file exists, but md5 not matched with remote file
// expected to remove old local file and redownload it
create_local_file(FILE_NAME);
create_remote_file(FILE_NAME, 2333, "md5_not_match");
ASSERT_EQ(test_do_download(), ERR_OK);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After redownloaded, better to check local file is exist and md5sum matched.

Copy link
Contributor Author

@hycdong hycdong May 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used block_file_mock function here, it is a mock function, whose download function is like below:

virtual dsn::task_ptr download(const download_request &req,
dsn::task_code code,
const download_callback &cb,
dsn::task_tracker *tracker = nullptr)
{
return task_ptr();
}

It will not do actual downloading action, the local file is not existed in unit test. And the real downloading action is hard to mock.

}

TEST_F(replica_file_provider_test, do_download_file_exist)
{
create_local_file(FILE_NAME);
create_remote_file(FILE_NAME, _file_meta.size, _file_meta.md5);
ASSERT_EQ(test_do_download(), ERR_OK);
}

TEST_F(replica_file_provider_test, do_download_succeed)
{
create_local_file(FILE_NAME);
create_remote_file(FILE_NAME, _file_meta.size, _file_meta.md5);
// remove local file to mock condition that file not existed
std::string file_name = utils::filesystem::path_combine(LOCAL_DIR, FILE_NAME);
utils::filesystem::remove_path(file_name);
ASSERT_EQ(test_do_download(), ERR_OK);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After downloaded, check local file is exist and md5sum matched.

}

} // namespace replication
} // namespace dsn