Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'newaio-with-tracer' of github.com:Shuo-Jia/rdsn into ne…
Browse files Browse the repository at this point in the history
…waio-with-tracer
foreverneverer committed Oct 19, 2020
2 parents d0bc147 + e46d657 commit 1e18d05
Showing 32 changed files with 737 additions and 652 deletions.
2 changes: 1 addition & 1 deletion include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
@@ -103,7 +103,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_DUPLICATION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_DUPLICATION_SYNC, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_UPDATE_APP_ENV, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_DDD_DIAGNOSE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_APP_PARTITION_SPLIT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_PARTITION_SPLIT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_REGISTER_CHILD_REPLICA, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON)
7 changes: 7 additions & 0 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
@@ -242,6 +242,13 @@ class replication_app_base : public replica_base

virtual ingestion_status::type get_ingestion_status() { return ingestion_status::IS_INVALID; }

virtual void on_detect_hotkey(const detect_hotkey_request &req,
/*out*/ detect_hotkey_response &resp)
{
resp.err = dsn::ERR_OBJECT_NOT_FOUND;
resp.__set_err_hint("on_detect_hotkey implementation not found");
}

public:
//
// utility functions to be used by app
4 changes: 4 additions & 0 deletions include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
@@ -191,6 +191,10 @@ class replication_ddl_client
detect_hotkey_request &req,
detect_hotkey_response &resp);

// partition split
error_with<start_partition_split_response> start_partition_split(const std::string &app_name,
int partition_count);

private:
bool static valid_app_char(int c);

9 changes: 9 additions & 0 deletions include/dsn/dist/replication/replication_enums.h
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ ENUM_REG(replication::partition_status::PS_ERROR)
ENUM_REG(replication::partition_status::PS_PRIMARY)
ENUM_REG(replication::partition_status::PS_SECONDARY)
ENUM_REG(replication::partition_status::PS_POTENTIAL_SECONDARY)
ENUM_REG(replication::partition_status::PS_PARTITION_SPLIT)
ENUM_END2(replication::partition_status::type, partition_status)

ENUM_BEGIN2(replication::read_semantic::type,
@@ -98,4 +99,12 @@ ENUM_BEGIN2(replication::detect_action::type, detect_action, replication::detect
ENUM_REG(replication::detect_action::START)
ENUM_REG(replication::detect_action::STOP)
ENUM_END2(replication::detect_action::type, detect_action)

ENUM_BEGIN2(replication::split_status::type, split_status, replication::split_status::NOT_SPLIT)
ENUM_REG(replication::split_status::NOT_SPLIT)
ENUM_REG(replication::split_status::SPLITTING)
ENUM_REG(replication::split_status::PAUSING)
ENUM_REG(replication::split_status::PAUSED)
ENUM_REG(replication::split_status::CANCELING)
ENUM_END2(replication::split_status::type, split_status)
}
96 changes: 52 additions & 44 deletions include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 9 additions & 32 deletions src/block_service/fds/fds_service.cpp
Original file line number Diff line number Diff line change
@@ -398,20 +398,6 @@ fds_file_object::fds_file_object(fds_service *s,
{
}

fds_file_object::fds_file_object(fds_service *s,
const std::string &name,
const std::string &fds_path,
const std::string &md5,
uint64_t size)
: block_file(name),
_service(s),
_fds_path(fds_path),
_md5sum(md5),
_size(size),
_has_meta_synced(true)
{
}

fds_file_object::~fds_file_object() {}

error_code fds_file_object::get_file_meta()
@@ -572,7 +558,7 @@ error_code fds_file_object::put_content(/*in-out*/ std::istream &is,
return err;
}

ddebug("start to check meta data after successfully wrote data to fds");
ddebug("start to synchronize meta data after successfully wrote data to fds");
err = get_file_meta();
if (err == ERR_OK) {
transfered_bytes = _size;
@@ -653,14 +639,6 @@ dsn::task_ptr fds_file_object::read(const read_request &req,
{
read_future_ptr t(new read_future(code, cb, 0));
t->set_tracker(tracker);
read_response resp;
if (_has_meta_synced && _md5sum.empty()) {
derror("fds read failed: meta not synced or md5sum empty when read (%s)",
_fds_path.c_str());
resp.err = dsn::ERR_OBJECT_NOT_FOUND;
t->enqueue_with(resp);
return t;
}

add_ref();
auto read_in_background = [this, req, t]() {
@@ -691,14 +669,6 @@ dsn::task_ptr fds_file_object::download(const download_request &req,
download_future_ptr t(new download_future(code, cb, 0));
t->set_tracker(tracker);
download_response resp;
if (_has_meta_synced && _md5sum.empty()) {
derror("fds download failed: meta not synced or md5sum empty when download (%s)",
_fds_path.c_str());
resp.err = dsn::ERR_OBJECT_NOT_FOUND;
resp.downloaded_size = 0;
t->enqueue_with(resp);
return t;
}

std::shared_ptr<std::ofstream> handle(new std::ofstream(
req.output_local_name, std::ios::binary | std::ios::out | std::ios::trunc));
@@ -722,9 +692,16 @@ dsn::task_ptr fds_file_object::download(const download_request &req,
resp.err =
get_content_in_batches(req.remote_pos, req.remote_length, *handle, transfered_size);
resp.downloaded_size = 0;
if (handle->tellp() != -1)
if (resp.err == ERR_OK && handle->tellp() != -1) {
resp.downloaded_size = handle->tellp();
}
handle->close();
if (resp.err != ERR_OK && dsn::utils::filesystem::file_exists(req.output_local_name)) {
derror_f("fail to download file {} from fds, remove localfile {}",
_fds_path,
req.output_local_name);
dsn::utils::filesystem::remove_path(req.output_local_name);
}
t->enqueue_with(resp);
release_ref();
};
5 changes: 0 additions & 5 deletions src/block_service/fds/fds_service.h
Original file line number Diff line number Diff line change
@@ -68,11 +68,6 @@ class fds_file_object : public block_file
{
public:
fds_file_object(fds_service *s, const std::string &name, const std::string &fds_path);
fds_file_object(fds_service *s,
const std::string &name,
const std::string &fds_path,
const std::string &md5,
uint64_t size);

virtual ~fds_file_object();
virtual uint64_t get_size() override { return _size; }
26 changes: 12 additions & 14 deletions src/block_service/test/fds_service_test.cpp
Original file line number Diff line number Diff line change
@@ -118,19 +118,18 @@ void FDSClientTest::TearDown() {}

DEFINE_TASK_CODE(lpc_btest, TASK_PRIORITY_HIGH, dsn::THREAD_POOL_DEFAULT)

// TODO(zhangyifan): the test could not pass, should fix.
TEST_F(FDSClientTest, test_basic_operation)
{
const char *files[] = {"/fdstest1/test1/test1",
"/fdstest1/test1/test2",
"/fdstest1/test2/test1",
"/fdstest1/test2/test2",
"/fdstest2/test2",
"/fdstest3",
"/fds_rootfile",
const char *files[] = {"/fdstest/fdstest1/test1/test1",
"/fdstest/fdstest1/test1/test2",
"/fdstest/fdstest1/test2/test1",
"/fdstest/fdstest1/test2/test2",
"/fdstest/fdstest2/test2",
"/fdstest/fdstest3",
"/fdstest/fds_rootfile",
nullptr};
// ensure prefix_path is the prefix of some file in files
std::string prefix_path = std::string("/fdstest1/test1");
std::string prefix_path = std::string("/fdstest/fdstest1/test1");
int total_files;

std::shared_ptr<fds_service> s = std::make_shared<fds_service>();
@@ -264,12 +263,12 @@ TEST_F(FDSClientTest, test_basic_operation)
std::cout << "test ls files" << std::endl;

// list the root
std::cout << "list the root" << std::endl;
std::cout << "list the test root" << std::endl;
std::vector<ls_entry> root = {
{"fdstest1", true}, {"fdstest2", true}, {"fdstest3", false}, {"fds_rootfile", false}};
std::sort(root.begin(), root.end(), entry_cmp);

s->list_dir(ls_request{"/"},
s->list_dir(ls_request{"/fdstest"},
lpc_btest,
[&l_resp](const ls_response &resp) { l_resp = resp; },
nullptr)
@@ -390,7 +389,7 @@ TEST_F(FDSClientTest, test_basic_operation)
// try to read a non-exist file
{
std::cout << "test try to read non-exist file" << std::endl;
s->create_file(create_file_request{"fds_hellword", true},
s->create_file(create_file_request{"non_exist_file", true},
lpc_btest,
[&cf_resp](const create_file_response &r) { cf_resp = r; },
nullptr)
@@ -639,7 +638,6 @@ generate_file(const char *filename, unsigned long long file_size, char *block, u
close(fd);
}

// TODO(zhangyifan): the test could not pass, should fix.
TEST_F(FDSClientTest, test_concurrent_upload_download)
{
char block[1024];
@@ -756,7 +754,7 @@ TEST_F(FDSClientTest, test_concurrent_upload_download)
for (unsigned int i = 0; i < total_files; ++i) {
block_file_ptr p = block_files[i];
dsn::task_ptr t =
p->download(download_request{filenames[i] + ".b"},
p->download(download_request{filenames[i] + ".b", 0, -1},
lpc_btest,
[&filenames, &filesize, &md5, i, p](const download_response &dr) {
printf("file %s download finished\n", filenames[i].c_str());
Loading

0 comments on commit 1e18d05

Please sign in to comment.