From 48bed3a43fa590e5b9a462cf72acc54f51d5a490 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 30 Sep 2019 16:07:44 +0800 Subject: [PATCH 1/3] refactor: rename task::is_empty to is_callback_empty & move aio tests out from service_api_c --- include/dsn/tool-api/task.h | 6 ++- src/core/core/task.cpp | 12 ++--- src/core/tests/aio.cpp | 91 ++++++++++++++++++++++++++++++++ src/core/tests/service_api_c.cpp | 91 -------------------------------- 4 files changed, 101 insertions(+), 99 deletions(-) diff --git a/include/dsn/tool-api/task.h b/include/dsn/tool-api/task.h index 0926e2d662..97478c04b4 100644 --- a/include/dsn/tool-api/task.h +++ b/include/dsn/tool-api/task.h @@ -235,7 +235,9 @@ class task : public ref_counter, public extensible_object, public trans error_code error() const { return _error; } service_node *node() const { return _node; } task_tracker *tracker() const { return _context_tracker.tracker(); } - bool is_empty() const { return _is_null; } + + /// \return Whether the callback of this task is empty + bool is_callback_empty() const { return _is_callback_empty; } // static helper utilities static task *get_current_task(); @@ -292,7 +294,7 @@ class task : public ref_counter, public extensible_object, public trans // virtual void clear_non_trivial_on_task_end() {} - bool _is_null; + bool _is_callback_empty; error_code _error; private: diff --git a/src/core/core/task.cpp b/src/core/core/task.cpp index ab98e73c65..487ec2b0b8 100644 --- a/src/core/core/task.cpp +++ b/src/core/core/task.cpp @@ -120,7 +120,7 @@ task::task(dsn::task_code code, int hash, service_node *node) _hash = hash; _delay_milliseconds = 0; _wait_for_cancel = false; - _is_null = false; + _is_callback_empty = false; next = nullptr; if (node != nullptr) { @@ -223,7 +223,7 @@ void task::exec_internal() } } - if (!_spec->allow_inline && !_is_null) { + if (!_spec->allow_inline && !_is_callback_empty) { lock_checker::check_dangling_lock(); } @@ -255,7 +255,7 @@ static void check_wait_task(task *waitee) return; // callee is empty - if (waitee->is_empty()) + if (waitee->is_callback_empty()) return; // there are enough concurrency @@ -413,7 +413,7 @@ void task::enqueue(task_worker_pool *pool) } // fast execution - if (_is_null) { + if (_is_callback_empty) { dassert(_node == task::get_current_node(), ""); exec_internal(); return; @@ -526,7 +526,7 @@ rpc_response_task::rpc_response_task(message_ex *request, node), _cb(std::move(cb)) { - _is_null = (_cb == nullptr); + _is_callback_empty = (_cb == nullptr); set_error_code(ERR_IO_PENDING); @@ -594,7 +594,7 @@ aio_task::aio_task(dsn::task_code code, const aio_handler &cb, int hash, service aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node *node) : task(code, hash, node), _cb(std::move(cb)) { - _is_null = (_cb == nullptr); + _is_callback_empty = (_cb == nullptr); dassert(TASK_TYPE_AIO == spec().type, "%s is not of AIO type, please use DEFINE_TASK_CODE_AIO to define the task code", diff --git a/src/core/tests/aio.cpp b/src/core/tests/aio.cpp index c29d0d9b60..9b000f32f6 100644 --- a/src/core/tests/aio.cpp +++ b/src/core/tests/aio.cpp @@ -203,3 +203,94 @@ TEST(core, operation_failed) EXPECT_TRUE(utils::filesystem::remove_path("tmp_test_file")); } + +DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_READ, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) +DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_WRITE, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) +struct aio_result +{ + dsn::error_code err; + size_t sz; +}; +TEST(core, dsn_file) +{ + if (task::get_current_disk() == nullptr) + return; + + int64_t fin_size, fout_size; + ASSERT_TRUE(utils::filesystem::file_size("command.txt", fin_size)); + ASSERT_LT(0, fin_size); + + dsn::disk_file *fin = file::open("command.txt", O_RDONLY, 0); + ASSERT_NE(nullptr, fin); + dsn::disk_file *fout = file::open("command.copy.txt", O_RDWR | O_CREAT | O_TRUNC, 0666); + ASSERT_NE(nullptr, fout); + char buffer[1024]; + uint64_t offset = 0; + while (true) { + aio_result rin; + aio_task_ptr tin = file::read(fin, + buffer, + 1024, + offset, + LPC_AIO_TEST_READ, + nullptr, + [&rin](dsn::error_code err, size_t sz) { + rin.err = err; + rin.sz = sz; + }, + 0); + ASSERT_NE(nullptr, tin); + + if (dsn::tools::get_current_tool()->name() != "simulator") { + // at least 1 for tin, but if already read completed, then only 1 + ASSERT_LE(1, tin->get_count()); + } + + tin->wait(); + ASSERT_EQ(rin.err, tin->error()); + if (rin.err != ERR_OK) { + ASSERT_EQ(ERR_HANDLE_EOF, rin.err); + break; + } + ASSERT_LT(0u, rin.sz); + ASSERT_EQ(rin.sz, tin->get_transferred_size()); + // this is only true for simulator + if (dsn::tools::get_current_tool()->name() == "simulator") { + ASSERT_EQ(1, tin->get_count()); + } + + aio_result rout; + aio_task_ptr tout = file::write(fout, + buffer, + rin.sz, + offset, + LPC_AIO_TEST_WRITE, + nullptr, + [&rout](dsn::error_code err, size_t sz) { + rout.err = err; + rout.sz = sz; + }, + 0); + ASSERT_NE(nullptr, tout); + tout->wait(); + ASSERT_EQ(ERR_OK, rout.err); + ASSERT_EQ(ERR_OK, tout->error()); + ASSERT_EQ(rin.sz, rout.sz); + ASSERT_EQ(rin.sz, tout->get_transferred_size()); + // this is only true for simulator + if (dsn::tools::get_current_tool()->name() == "simulator") { + ASSERT_EQ(1, tout->get_count()); + } + + ASSERT_EQ(ERR_OK, file::flush(fout)); + + offset += rin.sz; + } + + ASSERT_EQ((uint64_t)fin_size, offset); + ASSERT_EQ(ERR_OK, file::close(fout)); + ASSERT_EQ(ERR_OK, file::close(fin)); + + ASSERT_TRUE(utils::filesystem::file_size("command.copy.txt", fout_size)); + ASSERT_EQ(fin_size, fout_size); +} diff --git a/src/core/tests/service_api_c.cpp b/src/core/tests/service_api_c.cpp index e0231d1497..7593fe2320 100644 --- a/src/core/tests/service_api_c.cpp +++ b/src/core/tests/service_api_c.cpp @@ -196,97 +196,6 @@ TEST(core, dsn_semaphore) s.wait(); } -DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_READ, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) -DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_WRITE, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) -struct aio_result -{ - dsn::error_code err; - size_t sz; -}; -TEST(core, dsn_file) -{ - if (task::get_current_disk() == nullptr) - return; - - int64_t fin_size, fout_size; - ASSERT_TRUE(utils::filesystem::file_size("command.txt", fin_size)); - ASSERT_LT(0, fin_size); - - dsn::disk_file *fin = file::open("command.txt", O_RDONLY, 0); - ASSERT_NE(nullptr, fin); - dsn::disk_file *fout = file::open("command.copy.txt", O_RDWR | O_CREAT | O_TRUNC, 0666); - ASSERT_NE(nullptr, fout); - char buffer[1024]; - uint64_t offset = 0; - while (true) { - aio_result rin; - aio_task_ptr tin = file::read(fin, - buffer, - 1024, - offset, - LPC_AIO_TEST_READ, - nullptr, - [&rin](dsn::error_code err, size_t sz) { - rin.err = err; - rin.sz = sz; - }, - 0); - ASSERT_NE(nullptr, tin); - - if (dsn::tools::get_current_tool()->name() != "simulator") { - // at least 1 for tin, but if already read completed, then only 1 - ASSERT_LE(1, tin->get_count()); - } - - tin->wait(); - ASSERT_EQ(rin.err, tin->error()); - if (rin.err != ERR_OK) { - ASSERT_EQ(ERR_HANDLE_EOF, rin.err); - break; - } - ASSERT_LT(0u, rin.sz); - ASSERT_EQ(rin.sz, tin->get_transferred_size()); - // this is only true for simulator - if (dsn::tools::get_current_tool()->name() == "simulator") { - ASSERT_EQ(1, tin->get_count()); - } - - aio_result rout; - aio_task_ptr tout = file::write(fout, - buffer, - rin.sz, - offset, - LPC_AIO_TEST_WRITE, - nullptr, - [&rout](dsn::error_code err, size_t sz) { - rout.err = err; - rout.sz = sz; - }, - 0); - ASSERT_NE(nullptr, tout); - tout->wait(); - ASSERT_EQ(ERR_OK, rout.err); - ASSERT_EQ(ERR_OK, tout->error()); - ASSERT_EQ(rin.sz, rout.sz); - ASSERT_EQ(rin.sz, tout->get_transferred_size()); - // this is only true for simulator - if (dsn::tools::get_current_tool()->name() == "simulator") { - ASSERT_EQ(1, tout->get_count()); - } - - ASSERT_EQ(ERR_OK, file::flush(fout)); - - offset += rin.sz; - } - - ASSERT_EQ((uint64_t)fin_size, offset); - ASSERT_EQ(ERR_OK, file::close(fout)); - ASSERT_EQ(ERR_OK, file::close(fin)); - - ASSERT_TRUE(utils::filesystem::file_size("command.copy.txt", fout_size)); - ASSERT_EQ(fin_size, fout_size); -} - TEST(core, dsn_env) { if (dsn::service_engine::instance().spec().tool == "simulator") From bbb1e230d007c69cf54fdfa919a20454de75026f Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 8 Oct 2019 11:31:45 +0800 Subject: [PATCH 2/3] fix build --- src/core/tests/task_test.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/tests/task_test.cpp b/src/core/tests/task_test.cpp index 5a7432d0a2..cc81eb4ea1 100644 --- a/src/core/tests/task_test.cpp +++ b/src/core/tests/task_test.cpp @@ -17,7 +17,7 @@ class task_test : public ::testing::Test static void test_init() { aio_task t1(LPC_TASK_TEST, nullptr); - ASSERT_TRUE(t1._is_null); + ASSERT_TRUE(t1._is_callback_empty); ASSERT_EQ(t1._wait_event.load(), nullptr); ASSERT_EQ(t1.next, nullptr); ASSERT_EQ(t1._state, task_state::TASK_STATE_READY); @@ -39,7 +39,7 @@ class task_test : public ::testing::Test ASSERT_TRUE(t1->wait(10000)); ASSERT_EQ(t1->_state, task_state::TASK_STATE_FINISHED); ASSERT_TRUE(t1->_wait_event.load() == nullptr); - ASSERT_TRUE(t1->_is_null); + ASSERT_TRUE(t1->_is_callback_empty); } static void test_signal_finished_task() From a2252e3d054817c7c1eae2b66516b91916706345 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 10 Oct 2019 10:38:00 +0800 Subject: [PATCH 3/3] revert changes on task::is_empty --- include/dsn/tool-api/task.h | 6 ++---- src/core/core/task.cpp | 12 ++++++------ src/core/tests/task_test.cpp | 4 ++-- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/include/dsn/tool-api/task.h b/include/dsn/tool-api/task.h index 97478c04b4..0926e2d662 100644 --- a/include/dsn/tool-api/task.h +++ b/include/dsn/tool-api/task.h @@ -235,9 +235,7 @@ class task : public ref_counter, public extensible_object, public trans error_code error() const { return _error; } service_node *node() const { return _node; } task_tracker *tracker() const { return _context_tracker.tracker(); } - - /// \return Whether the callback of this task is empty - bool is_callback_empty() const { return _is_callback_empty; } + bool is_empty() const { return _is_null; } // static helper utilities static task *get_current_task(); @@ -294,7 +292,7 @@ class task : public ref_counter, public extensible_object, public trans // virtual void clear_non_trivial_on_task_end() {} - bool _is_callback_empty; + bool _is_null; error_code _error; private: diff --git a/src/core/core/task.cpp b/src/core/core/task.cpp index 487ec2b0b8..ab98e73c65 100644 --- a/src/core/core/task.cpp +++ b/src/core/core/task.cpp @@ -120,7 +120,7 @@ task::task(dsn::task_code code, int hash, service_node *node) _hash = hash; _delay_milliseconds = 0; _wait_for_cancel = false; - _is_callback_empty = false; + _is_null = false; next = nullptr; if (node != nullptr) { @@ -223,7 +223,7 @@ void task::exec_internal() } } - if (!_spec->allow_inline && !_is_callback_empty) { + if (!_spec->allow_inline && !_is_null) { lock_checker::check_dangling_lock(); } @@ -255,7 +255,7 @@ static void check_wait_task(task *waitee) return; // callee is empty - if (waitee->is_callback_empty()) + if (waitee->is_empty()) return; // there are enough concurrency @@ -413,7 +413,7 @@ void task::enqueue(task_worker_pool *pool) } // fast execution - if (_is_callback_empty) { + if (_is_null) { dassert(_node == task::get_current_node(), ""); exec_internal(); return; @@ -526,7 +526,7 @@ rpc_response_task::rpc_response_task(message_ex *request, node), _cb(std::move(cb)) { - _is_callback_empty = (_cb == nullptr); + _is_null = (_cb == nullptr); set_error_code(ERR_IO_PENDING); @@ -594,7 +594,7 @@ aio_task::aio_task(dsn::task_code code, const aio_handler &cb, int hash, service aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node *node) : task(code, hash, node), _cb(std::move(cb)) { - _is_callback_empty = (_cb == nullptr); + _is_null = (_cb == nullptr); dassert(TASK_TYPE_AIO == spec().type, "%s is not of AIO type, please use DEFINE_TASK_CODE_AIO to define the task code", diff --git a/src/core/tests/task_test.cpp b/src/core/tests/task_test.cpp index cc81eb4ea1..5a7432d0a2 100644 --- a/src/core/tests/task_test.cpp +++ b/src/core/tests/task_test.cpp @@ -17,7 +17,7 @@ class task_test : public ::testing::Test static void test_init() { aio_task t1(LPC_TASK_TEST, nullptr); - ASSERT_TRUE(t1._is_callback_empty); + ASSERT_TRUE(t1._is_null); ASSERT_EQ(t1._wait_event.load(), nullptr); ASSERT_EQ(t1.next, nullptr); ASSERT_EQ(t1._state, task_state::TASK_STATE_READY); @@ -39,7 +39,7 @@ class task_test : public ::testing::Test ASSERT_TRUE(t1->wait(10000)); ASSERT_EQ(t1->_state, task_state::TASK_STATE_FINISHED); ASSERT_TRUE(t1->_wait_event.load() == nullptr); - ASSERT_TRUE(t1->_is_callback_empty); + ASSERT_TRUE(t1->_is_null); } static void test_signal_finished_task()