Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: move aio tests out from service_api_c #323

Merged
merged 9 commits into from
Oct 12, 2019
Merged
6 changes: 4 additions & 2 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ class task : public ref_counter, public extensible_object<task, 4>, public trans
error_code error() const { return _error; }
service_node *node() const { return _node; }
task_tracker *tracker() const { return _context_tracker.tracker(); }
bool is_empty() const { return _is_null; }

/// \return Whether the callback of this task is empty
bool is_callback_empty() const { return _is_callback_empty; }

// static helper utilities
static task *get_current_task();
Expand Down Expand Up @@ -292,7 +294,7 @@ class task : public ref_counter, public extensible_object<task, 4>, public trans
//
virtual void clear_non_trivial_on_task_end() {}

bool _is_null;
bool _is_callback_empty;
error_code _error;

private:
Expand Down
12 changes: 6 additions & 6 deletions src/core/core/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ task::task(dsn::task_code code, int hash, service_node *node)
_hash = hash;
_delay_milliseconds = 0;
_wait_for_cancel = false;
_is_null = false;
_is_callback_empty = false;
next = nullptr;

if (node != nullptr) {
Expand Down Expand Up @@ -223,7 +223,7 @@ void task::exec_internal()
}
}

if (!_spec->allow_inline && !_is_null) {
if (!_spec->allow_inline && !_is_callback_empty) {
lock_checker::check_dangling_lock();
}

Expand Down Expand Up @@ -255,7 +255,7 @@ static void check_wait_task(task *waitee)
return;

// callee is empty
if (waitee->is_empty())
if (waitee->is_callback_empty())
return;

// there are enough concurrency
Expand Down Expand Up @@ -413,7 +413,7 @@ void task::enqueue(task_worker_pool *pool)
}

// fast execution
if (_is_null) {
if (_is_callback_empty) {
dassert(_node == task::get_current_node(), "");
exec_internal();
return;
Expand Down Expand Up @@ -526,7 +526,7 @@ rpc_response_task::rpc_response_task(message_ex *request,
node),
_cb(std::move(cb))
{
_is_null = (_cb == nullptr);
_is_callback_empty = (_cb == nullptr);
neverchanje marked this conversation as resolved.
Show resolved Hide resolved

set_error_code(ERR_IO_PENDING);

Expand Down Expand Up @@ -594,7 +594,7 @@ aio_task::aio_task(dsn::task_code code, const aio_handler &cb, int hash, service
aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node *node)
: task(code, hash, node), _cb(std::move(cb))
{
_is_null = (_cb == nullptr);
_is_callback_empty = (_cb == nullptr);

dassert(TASK_TYPE_AIO == spec().type,
"%s is not of AIO type, please use DEFINE_TASK_CODE_AIO to define the task code",
Expand Down
91 changes: 91 additions & 0 deletions src/core/tests/aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,94 @@ TEST(core, operation_failed)

EXPECT_TRUE(utils::filesystem::remove_path("tmp_test_file"));
}

DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_READ, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_WRITE, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
struct aio_result
{
dsn::error_code err;
size_t sz;
};
TEST(core, dsn_file)
{
if (task::get_current_disk() == nullptr)
return;

int64_t fin_size, fout_size;
ASSERT_TRUE(utils::filesystem::file_size("command.txt", fin_size));
ASSERT_LT(0, fin_size);

dsn::disk_file *fin = file::open("command.txt", O_RDONLY, 0);
ASSERT_NE(nullptr, fin);
dsn::disk_file *fout = file::open("command.copy.txt", O_RDWR | O_CREAT | O_TRUNC, 0666);
ASSERT_NE(nullptr, fout);
char buffer[1024];
uint64_t offset = 0;
while (true) {
aio_result rin;
aio_task_ptr tin = file::read(fin,
buffer,
1024,
offset,
LPC_AIO_TEST_READ,
nullptr,
[&rin](dsn::error_code err, size_t sz) {
rin.err = err;
rin.sz = sz;
},
0);
ASSERT_NE(nullptr, tin);

if (dsn::tools::get_current_tool()->name() != "simulator") {
// at least 1 for tin, but if already read completed, then only 1
ASSERT_LE(1, tin->get_count());
}

tin->wait();
ASSERT_EQ(rin.err, tin->error());
if (rin.err != ERR_OK) {
ASSERT_EQ(ERR_HANDLE_EOF, rin.err);
break;
}
ASSERT_LT(0u, rin.sz);
ASSERT_EQ(rin.sz, tin->get_transferred_size());
// this is only true for simulator
if (dsn::tools::get_current_tool()->name() == "simulator") {
ASSERT_EQ(1, tin->get_count());
}

aio_result rout;
aio_task_ptr tout = file::write(fout,
buffer,
rin.sz,
offset,
LPC_AIO_TEST_WRITE,
nullptr,
[&rout](dsn::error_code err, size_t sz) {
rout.err = err;
rout.sz = sz;
},
0);
ASSERT_NE(nullptr, tout);
tout->wait();
ASSERT_EQ(ERR_OK, rout.err);
ASSERT_EQ(ERR_OK, tout->error());
ASSERT_EQ(rin.sz, rout.sz);
ASSERT_EQ(rin.sz, tout->get_transferred_size());
// this is only true for simulator
if (dsn::tools::get_current_tool()->name() == "simulator") {
ASSERT_EQ(1, tout->get_count());
}

ASSERT_EQ(ERR_OK, file::flush(fout));

offset += rin.sz;
}

ASSERT_EQ((uint64_t)fin_size, offset);
ASSERT_EQ(ERR_OK, file::close(fout));
ASSERT_EQ(ERR_OK, file::close(fin));

ASSERT_TRUE(utils::filesystem::file_size("command.copy.txt", fout_size));
ASSERT_EQ(fin_size, fout_size);
}
91 changes: 0 additions & 91 deletions src/core/tests/service_api_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,97 +196,6 @@ TEST(core, dsn_semaphore)
s.wait();
}

DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_READ, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_WRITE, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
struct aio_result
{
dsn::error_code err;
size_t sz;
};
TEST(core, dsn_file)
{
if (task::get_current_disk() == nullptr)
return;

int64_t fin_size, fout_size;
ASSERT_TRUE(utils::filesystem::file_size("command.txt", fin_size));
ASSERT_LT(0, fin_size);

dsn::disk_file *fin = file::open("command.txt", O_RDONLY, 0);
ASSERT_NE(nullptr, fin);
dsn::disk_file *fout = file::open("command.copy.txt", O_RDWR | O_CREAT | O_TRUNC, 0666);
ASSERT_NE(nullptr, fout);
char buffer[1024];
uint64_t offset = 0;
while (true) {
aio_result rin;
aio_task_ptr tin = file::read(fin,
buffer,
1024,
offset,
LPC_AIO_TEST_READ,
nullptr,
[&rin](dsn::error_code err, size_t sz) {
rin.err = err;
rin.sz = sz;
},
0);
ASSERT_NE(nullptr, tin);

if (dsn::tools::get_current_tool()->name() != "simulator") {
// at least 1 for tin, but if already read completed, then only 1
ASSERT_LE(1, tin->get_count());
}

tin->wait();
ASSERT_EQ(rin.err, tin->error());
if (rin.err != ERR_OK) {
ASSERT_EQ(ERR_HANDLE_EOF, rin.err);
break;
}
ASSERT_LT(0u, rin.sz);
ASSERT_EQ(rin.sz, tin->get_transferred_size());
// this is only true for simulator
if (dsn::tools::get_current_tool()->name() == "simulator") {
ASSERT_EQ(1, tin->get_count());
}

aio_result rout;
aio_task_ptr tout = file::write(fout,
buffer,
rin.sz,
offset,
LPC_AIO_TEST_WRITE,
nullptr,
[&rout](dsn::error_code err, size_t sz) {
rout.err = err;
rout.sz = sz;
},
0);
ASSERT_NE(nullptr, tout);
tout->wait();
ASSERT_EQ(ERR_OK, rout.err);
ASSERT_EQ(ERR_OK, tout->error());
ASSERT_EQ(rin.sz, rout.sz);
ASSERT_EQ(rin.sz, tout->get_transferred_size());
// this is only true for simulator
if (dsn::tools::get_current_tool()->name() == "simulator") {
ASSERT_EQ(1, tout->get_count());
}

ASSERT_EQ(ERR_OK, file::flush(fout));

offset += rin.sz;
}

ASSERT_EQ((uint64_t)fin_size, offset);
ASSERT_EQ(ERR_OK, file::close(fout));
ASSERT_EQ(ERR_OK, file::close(fin));

ASSERT_TRUE(utils::filesystem::file_size("command.copy.txt", fout_size));
ASSERT_EQ(fin_size, fout_size);
}

TEST(core, dsn_env)
{
if (dsn::service_engine::instance().spec().tool == "simulator")
Expand Down
4 changes: 2 additions & 2 deletions src/core/tests/task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class task_test : public ::testing::Test
static void test_init()
{
aio_task t1(LPC_TASK_TEST, nullptr);
ASSERT_TRUE(t1._is_null);
ASSERT_TRUE(t1._is_callback_empty);
ASSERT_EQ(t1._wait_event.load(), nullptr);
ASSERT_EQ(t1.next, nullptr);
ASSERT_EQ(t1._state, task_state::TASK_STATE_READY);
Expand All @@ -39,7 +39,7 @@ class task_test : public ::testing::Test
ASSERT_TRUE(t1->wait(10000));
ASSERT_EQ(t1->_state, task_state::TASK_STATE_FINISHED);
ASSERT_TRUE(t1->_wait_event.load() == nullptr);
ASSERT_TRUE(t1->_is_null);
ASSERT_TRUE(t1->_is_callback_empty);
}

static void test_signal_finished_task()
Expand Down