Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor(bulk-load): add replica_bulk_loader class (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored May 21, 2020
1 parent 3669f6f commit 749f5e2
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 189 deletions.
2 changes: 1 addition & 1 deletion scripts/linux/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ echo "################################# start testing ##########################
if [ -z "$TEST_MODULE" ]
then
# supported test module
TEST_MODULE="dsn.core.tests,dsn.tests,dsn_nfs_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test"
TEST_MODULE="dsn.core.tests,dsn.tests,dsn_nfs_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test"
fi

echo "TEST_MODULE=$TEST_MODULE"
Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ set(BACKUP_SRC
backup/replica_backup_manager.cpp
)

set(BULK_LOAD_SRC
bulk_load/replica_bulk_loader.cpp)

# Source files under CURRENT project directory will be automatically included.
# You can manually set MY_PROJ_SRC to include source files under other directories.
set(MY_PROJ_SRC
${DUPLICATION_SRC}
${BACKUP_SRC}
${BULK_LOAD_SRC}
)

# Search mode for source files under CURRENT project directory?
Expand Down Expand Up @@ -45,3 +49,5 @@ dsn_add_shared_library()
add_subdirectory(duplication/test)

add_subdirectory(backup/test)

add_subdirectory(bulk_load/test)

Large diffs are not rendered by default.

111 changes: 111 additions & 0 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once

#include <dist/replication/lib/replica.h>
#include <dist/replication/lib/replica_context.h>
#include <dist/replication/lib/replica_stub.h>

namespace dsn {
namespace replication {

class replica_bulk_loader : replica_base
{
public:
explicit replica_bulk_loader(replica *r);
~replica_bulk_loader();

void on_bulk_load(const bulk_load_request &request, /*out*/ bulk_load_response &response);

void on_group_bulk_load(const group_bulk_load_request &request,
/*out*/ group_bulk_load_response &response);

private:
void broadcast_group_bulk_load(const bulk_load_request &meta_req);
void on_group_bulk_load_reply(error_code err,
const group_bulk_load_request &req,
const group_bulk_load_response &resp);

error_code do_bulk_load(const std::string &app_name,
bulk_load_status::type meta_status,
const std::string &cluster_name,
const std::string &provider_name);

// replica start or restart download sst files from remote provider
// \return ERR_BUSY if node has already had enough replica executing downloading
// \return download errors by function `download_sst_files`
error_code bulk_load_start_download(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name);

// download metadata and sst files from remote provider
// metadata and sst files will be downloaded in {_dir}/.bulk_load directory
// \return ERR_FILE_OPERATION_FAILED: create local bulk load dir failed
// \return download metadata file error, see function `do_download`
// \return parse metadata file error, see function `parse_bulk_load_metadata`
error_code download_sst_files(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name);

// \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed
// \return ERR_CORRUPTION: parse failed
error_code parse_bulk_load_metadata(const std::string &fname, /*out*/ bulk_load_metadata &meta);

bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir);
void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name);
void try_decrease_bulk_load_download_count();

void clear_bulk_load_states();

void report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
bool report_metadata,
/*out*/ bulk_load_response &response);
void report_bulk_load_states_to_primary(bulk_load_status::type remote_status,
/*out*/ group_bulk_load_response &response);

///
/// bulk load path on remote file provider:
/// <bulk_load_root>/<cluster_name>/<app_name>/{bulk_load_info}
/// <bulk_load_root>/<cluster_name>/<app_name>/<partition_index>/<file_name>
/// <bulk_load_root>/<cluster_name>/<app_name>/<partition_index>/bulk_load_metadata
///
// get partition's file dir on remote file provider
inline std::string get_remote_bulk_load_dir(const std::string &app_name,
const std::string &cluster_name,
uint32_t pidx) const
{
std::ostringstream oss;
oss << _replica->_options->bulk_load_provider_root << "/" << cluster_name << "/" << app_name
<< "/" << pidx;
return oss.str();
}

inline bulk_load_status::type get_bulk_load_status() const { return _status; }

inline void set_bulk_load_status(bulk_load_status::type status) { _status = status; }

//
// helper functions
//
partition_status::type status() const { return _replica->status(); }
ballot get_ballot() const { return _replica->get_ballot(); }
task_tracker *tracker() { return _replica->tracker(); }

private:
replica *_replica;
replica_stub *_stub;

friend class replica;
friend class replica_bulk_loader_test;

bulk_load_status::type _status{bulk_load_status::BLS_INVALID};
bulk_load_metadata _metadata;
std::atomic<error_code> _download_status{ERR_OK};
// file_name -> downloading task
std::map<std::string, task_ptr> _download_task;
};

} // namespace replication
} // namespace dsn
23 changes: 23 additions & 0 deletions src/dist/replication/lib/bulk_load/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
set(MY_PROJ_NAME dsn_replica_bulk_load_test)

set(MY_PROJ_SRC "")

set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_LIBS dsn_meta_server
dsn_replica_server
dsn_replication_common
dsn.block_service.local
dsn.block_service.fds
dsn_runtime
gtest
)

set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)

set(MY_BINPLACES
config-test.ini
run.sh
)

dsn_add_test()
75 changes: 75 additions & 0 deletions src/dist/replication/lib/bulk_load/test/config-test.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
[apps..default]
run = true
count = 1
;network.client.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536
;network.client.RPC_CHANNEL_UDP = dsn::tools::sim_network_provider, 65536
;network.server.0.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536

[apps.replica]
type = replica
run = true
count = 1
ports = 54321
pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_LOCAL_SERVICE

[core]
;tool = simulator
tool = nativerun

;toollets = tracer, profiler
;fault_injector
pause_on_start = false

logging_start_level = LOG_LEVEL_DEBUG
logging_factory_name = dsn::tools::simple_logger


[tools.simple_logger]
fast_flush = true
short_header = false
stderr_start_level = LOG_LEVEL_WARNING

[tools.simulator]
random_seed = 1465902258

[tools.screen_logger]
short_header = false

[network]
; how many network threads for network library (used by asio)
io_service_worker_count = 2

; specification for each thread pool
[threadpool..default]
worker_count = 4

[threadpool.THREAD_POOL_DEFAULT]
name = default
partitioned = false
worker_priority = THREAD_xPRIORITY_NORMAL
worker_count = 2

[threadpool.THREAD_POOL_REPLICATION]
name = replica
partitioned = true
worker_priority = THREAD_xPRIORITY_NORMAL
worker_count = 3

[threadpool.THREAD_POOL_REPLICATION_LONG]
name = replica_long

[threadpool.THREAD_POOL_LOCAL_SERVICE]
name = local_service
worker_count = 1

[task..default]
is_trace = true
is_profile = true
allow_inline = false
rpc_call_channel = RPC_CHANNEL_TCP
rpc_message_header_format = dsn
rpc_timeout_milliseconds = 5000

[block_service.local_service]
type = local_service
args =
39 changes: 39 additions & 0 deletions src/dist/replication/lib/bulk_load/test/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <gtest/gtest.h>

#include <dsn/service_api_cpp.h>

int g_test_count = 0;
int g_test_ret = 0;

class gtest_app : public dsn::service_app
{
public:
gtest_app(const dsn::service_app_info *info) : ::dsn::service_app(info) {}

dsn::error_code start(const std::vector<std::string> &args) override
{
g_test_ret = RUN_ALL_TESTS();
g_test_count = 1;
return dsn::ERR_OK;
}

dsn::error_code stop(bool) override { return dsn::ERR_OK; }
};

GTEST_API_ int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);

dsn::service_app::register_factory<gtest_app>("replica");

dsn_run_config("config-test.ini", false);
while (g_test_count == 0) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}

dsn_exit(g_test_ret);
}
Loading

0 comments on commit 749f5e2

Please sign in to comment.