diff --git a/src/aio/CMakeLists.txt b/src/aio/CMakeLists.txt index 45d24cf687..3754361d06 100644 --- a/src/aio/CMakeLists.txt +++ b/src/aio/CMakeLists.txt @@ -33,7 +33,7 @@ set(MY_PROJ_SRC "") #"GLOB" for non - recursive search set(MY_SRC_SEARCH_MODE "GLOB") -set(MY_PROJ_LIBS dsn_runtime) +set(MY_PROJ_LIBS dsn_runtime rocksdb) #Extra files that will be installed set(MY_BINPLACES "") diff --git a/src/aio/aio_provider.h b/src/aio/aio_provider.h index 73848d87be..74fb410bd7 100644 --- a/src/aio/aio_provider.h +++ b/src/aio/aio_provider.h @@ -27,10 +27,17 @@ #pragma once #include +#include +#include #include "utils/error_code.h" #include "utils/factory_store.h" +namespace rocksdb { +class RandomAccessFile; +class RandomRWFile; +} // namespace rocksdb + namespace dsn { class aio_context; @@ -60,12 +67,13 @@ class aio_provider explicit aio_provider(disk_engine *disk); virtual ~aio_provider() = default; - virtual linux_fd_t open(const char *file_name, int flag, int pmode) = 0; + virtual std::unique_ptr open_read_file(const std::string &fname) = 0; + virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0; - virtual error_code close(linux_fd_t fd) = 0; - virtual error_code flush(linux_fd_t fd) = 0; + virtual std::unique_ptr open_write_file(const std::string &fname) = 0; virtual error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0; - virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0; + virtual error_code flush(rocksdb::RandomRWFile *rwf) = 0; + virtual error_code close(rocksdb::RandomRWFile *rwf) = 0; // Submits the aio_task to the underlying disk-io executor. // This task may not be executed immediately, call `aio_task::wait` diff --git a/src/aio/disk_engine.cpp b/src/aio/disk_engine.cpp index 0e25bfaac6..1b104be301 100644 --- a/src/aio/disk_engine.cpp +++ b/src/aio/disk_engine.cpp @@ -102,22 +102,26 @@ aio_task *disk_write_queue::unlink_next_workload(void *plength) return first; } -disk_file::disk_file(linux_fd_t fd) : _fd(fd) {} +disk_file::disk_file(std::unique_ptr rf) : _read_file(std::move(rf)) {} +disk_file::disk_file(std::unique_ptr wf) : _write_file(std::move(wf)) {} aio_task *disk_file::read(aio_task *tsk) { + CHECK(_read_file, ""); tsk->add_ref(); // release on completion, see `on_read_completed`. return _read_queue.add_work(tsk, nullptr); } aio_task *disk_file::write(aio_task *tsk, void *ctx) { + CHECK(_write_file, ""); tsk->add_ref(); // release on completion return _write_queue.add_work(tsk, ctx); } aio_task *disk_file::on_read_completed(aio_task *wk, error_code err, size_t size) { + CHECK(_read_file, ""); CHECK(wk->next == nullptr, ""); auto ret = _read_queue.on_work_completed(wk, nullptr); wk->enqueue(err, size); @@ -128,6 +132,7 @@ aio_task *disk_file::on_read_completed(aio_task *wk, error_code err, size_t size aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err, size_t size) { + CHECK(_write_file, ""); auto ret = _write_queue.on_work_completed(wk, ctx); while (wk) { diff --git a/src/aio/disk_engine.h b/src/aio/disk_engine.h index 8eb7846359..70cce466a1 100644 --- a/src/aio/disk_engine.h +++ b/src/aio/disk_engine.h @@ -32,6 +32,7 @@ #include "aio/aio_task.h" #include "aio_provider.h" +#include "rocksdb/env.h" #include "utils/singleton.h" #include "utils/work_queue.h" @@ -56,17 +57,21 @@ class disk_write_queue : public work_queue class disk_file { public: - explicit disk_file(linux_fd_t fd); + explicit disk_file(std::unique_ptr rf); + explicit disk_file(std::unique_ptr wf); aio_task *read(aio_task *tsk); aio_task *write(aio_task *tsk, void *ctx); aio_task *on_read_completed(aio_task *wk, error_code err, size_t size); aio_task *on_write_completed(aio_task *wk, void *ctx, error_code err, size_t size); - linux_fd_t native_handle() const { return _fd; } + rocksdb::RandomAccessFile *rfile() const { return _read_file.get(); } + rocksdb::RandomRWFile *wfile() const { return _write_file.get(); } private: - linux_fd_t _fd; + // TODO(yingchun): unify to use a single RandomRWFile member variable. + std::unique_ptr _read_file; + std::unique_ptr _write_file; disk_write_queue _write_queue; work_queue _read_queue; }; diff --git a/src/aio/file_io.cpp b/src/aio/file_io.cpp index 4f3c10bb54..a4bf26ba85 100644 --- a/src/aio/file_io.cpp +++ b/src/aio/file_io.cpp @@ -26,32 +26,51 @@ #include "aio/file_io.h" +#include // IWYU pragma: no_include #include #include "aio/aio_provider.h" #include "disk_engine.h" +#include "rocksdb/env.h" +#include "utils/fmt_logging.h" namespace dsn { class task_tracker; namespace file { -/*extern*/ disk_file *open(const char *file_name, int flag, int pmode) +/*extern*/ disk_file *open(const std::string &fname, FileOpenType type) { - auto fd = disk_engine::provider().open(file_name, flag, pmode); - if (fd.is_invalid()) { - return nullptr; + switch (type) { + case FileOpenType::kReadOnly: { + auto sf = disk_engine::provider().open_read_file(fname); + if (!sf) { + return nullptr; + } + return new disk_file(std::move(sf)); } - - return new disk_file(fd); + case FileOpenType::kWriteOnly: { + auto wf = disk_engine::provider().open_write_file(fname); + if (!wf) { + return nullptr; + } + return new disk_file(std::move(wf)); + } + default: + CHECK(false, ""); + } + return nullptr; } /*extern*/ error_code close(disk_file *file) { - error_code result = ERR_INVALID_HANDLE; + error_code result = ERR_OK; if (file != nullptr) { - result = disk_engine::provider().close(file->native_handle()); + // A read file is not needed to close. + if (file->wfile()) { + result = disk_engine::provider().close(file->wfile()); + } delete file; file = nullptr; } @@ -60,11 +79,11 @@ namespace file { /*extern*/ error_code flush(disk_file *file) { - if (nullptr != file) { - return disk_engine::provider().flush(file->native_handle()); - } else { + if (file == nullptr || file->wfile() == nullptr) { return ERR_INVALID_HANDLE; } + + return disk_engine::provider().flush(file->wfile()); } /*extern*/ aio_task_ptr read(disk_file *file, @@ -84,7 +103,8 @@ namespace file { cb->get_aio_context()->engine = &disk_engine::instance(); cb->get_aio_context()->dfile = file; - if (!cb->spec().on_aio_call.execute(task::get_current_task(), cb, true)) { + if (!cb->spec().on_aio_call.execute(task::get_current_task(), cb, true) || + file->rfile() == nullptr) { cb->enqueue(ERR_FILE_OPERATION_FAILED, 0); return cb; } @@ -110,6 +130,10 @@ namespace file { cb->get_aio_context()->file_offset = offset; cb->get_aio_context()->type = AIO_Write; cb->get_aio_context()->dfile = file; + if (file->wfile() == nullptr) { + cb->enqueue(ERR_FILE_OPERATION_FAILED, 0); + return cb; + } disk_engine::instance().write(cb); return cb; diff --git a/src/aio/file_io.h b/src/aio/file_io.h index f0b6ffc420..0cef3f1b80 100644 --- a/src/aio/file_io.h +++ b/src/aio/file_io.h @@ -28,6 +28,7 @@ #include #include +#include #include #include "aio/aio_task.h" @@ -47,6 +48,13 @@ class task_tracker; namespace file { +enum class FileOpenType +{ + kReadOnly = 0, + kWriteOnly +}; + +// TODO(yingchun): consider to return a smart pointer /// open file /// /// \param file_name filename of the file. @@ -55,7 +63,7 @@ namespace file { /// /// \return file handle /// -extern disk_file *open(const char *file_name, int flag, int pmode); +extern disk_file *open(const std::string &fname, FileOpenType type); /// close the file handle extern error_code close(disk_file *file); diff --git a/src/aio/native_linux_aio_provider.cpp b/src/aio/native_linux_aio_provider.cpp index 56a0efa1e1..7270dff733 100644 --- a/src/aio/native_linux_aio_provider.cpp +++ b/src/aio/native_linux_aio_provider.cpp @@ -26,19 +26,17 @@ #include "native_linux_aio_provider.h" -#include -#include -#include -#include - #include "aio/aio_provider.h" #include "aio/disk_engine.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" #include "runtime/service_engine.h" #include "runtime/task/async_calls.h" +#include "utils/env.h" #include "utils/fmt_logging.h" #include "utils/latency_tracer.h" #include "utils/ports.h" -#include "utils/safe_strerror_posix.h" namespace dsn { @@ -46,87 +44,101 @@ native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_pr native_linux_aio_provider::~native_linux_aio_provider() {} -linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag, int pmode) +std::unique_ptr +native_linux_aio_provider::open_read_file(const std::string &fname) { - auto fd = ::open(file_name, flag, pmode); - if (fd == DSN_INVALID_FILE_HANDLE) { - LOG_ERROR("create file '{}' failed, err = {}", file_name, utils::safe_strerror(errno)); + std::unique_ptr rfile; + auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive) + ->NewRandomAccessFile(fname, &rfile, rocksdb::EnvOptions()); + if (!s.ok()) { + LOG_ERROR("open read file '{}' failed, err = {}", fname, s.ToString()); } - return linux_fd_t(fd); + return rfile; } -error_code native_linux_aio_provider::close(linux_fd_t fd) +std::unique_ptr +native_linux_aio_provider::open_write_file(const std::string &fname) { - if (fd.is_invalid() || ::close(fd.fd) == 0) { - return ERR_OK; + // rocksdb::NewRandomRWFile() doesn't act as the docs described, it will not create the + // file if it not exists, and an error Status will be returned, so we try to create the + // file by ReopenWritableFile() if it not exist. + auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)->FileExists(fname); + if (!s.ok() && !s.IsNotFound()) { + LOG_ERROR("failed to check whether the file '{}' exist, err = {}", fname, s.ToString()); + return nullptr; + } + + if (s.IsNotFound()) { + std::unique_ptr cfile; + s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive) + ->ReopenWritableFile(fname, &cfile, rocksdb::EnvOptions()); + if (!s.ok()) { + LOG_ERROR("failed to create file '{}', err = {}", fname, s.ToString()); + return nullptr; + } } - LOG_ERROR("close file failed, err = {}", utils::safe_strerror(errno)); - return ERR_FILE_OPERATION_FAILED; + // Open the file for write as RandomRWFile, to support un-sequential write. + std::unique_ptr wfile; + s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive) + ->NewRandomRWFile(fname, &wfile, rocksdb::EnvOptions()); + if (!s.ok()) { + LOG_ERROR("open write file '{}' failed, err = {}", fname, s.ToString()); + } + return wfile; } -error_code native_linux_aio_provider::flush(linux_fd_t fd) +error_code native_linux_aio_provider::close(rocksdb::RandomRWFile *wf) { - if (fd.is_invalid() || ::fsync(fd.fd) == 0) { - return ERR_OK; + auto s = wf->Close(); + if (!s.ok()) { + LOG_ERROR("close file failed, err = {}", s.ToString()); + return ERR_FILE_OPERATION_FAILED; } - LOG_ERROR("flush file failed, err = {}", utils::safe_strerror(errno)); - return ERR_FILE_OPERATION_FAILED; + return ERR_OK; +} + +error_code native_linux_aio_provider::flush(rocksdb::RandomRWFile *wf) +{ + auto s = wf->Fsync(); + if (!s.ok()) { + LOG_ERROR("flush file failed, err = {}", s.ToString()); + return ERR_FILE_OPERATION_FAILED; + } + + return ERR_OK; } error_code native_linux_aio_provider::write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) { - dsn::error_code resp = ERR_OK; - uint64_t buffer_offset = 0; - do { - // ret is the written data size - auto ret = ::pwrite(aio_ctx.dfile->native_handle().fd, - (char *)aio_ctx.buffer + buffer_offset, - aio_ctx.buffer_size - buffer_offset, - aio_ctx.file_offset + buffer_offset); - if (dsn_unlikely(ret < 0)) { - if (errno == EINTR) { - LOG_WARNING("write failed with errno={} and will retry it.", - utils::safe_strerror(errno)); - continue; - } - resp = ERR_FILE_OPERATION_FAILED; - LOG_ERROR("write failed with errno={}, return {}.", utils::safe_strerror(errno), resp); - return resp; - } - - buffer_offset += ret; - if (dsn_unlikely(buffer_offset != aio_ctx.buffer_size)) { - LOG_WARNING( - "write incomplete, request_size={}, total_write_size={}, this_write_size={}, " - "and will retry it.", - aio_ctx.buffer_size, - buffer_offset, - ret); - } - } while (dsn_unlikely(buffer_offset < aio_ctx.buffer_size)); + rocksdb::Slice data((const char *)(aio_ctx.buffer), aio_ctx.buffer_size); + auto s = aio_ctx.dfile->wfile()->Write(aio_ctx.file_offset, data); + if (!s.ok()) { + LOG_ERROR("write file failed, err = {}", s.ToString()); + return ERR_FILE_OPERATION_FAILED; + } - *processed_bytes = buffer_offset; - return resp; + *processed_bytes = aio_ctx.buffer_size; + return ERR_OK; } error_code native_linux_aio_provider::read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) { - auto ret = ::pread(aio_ctx.dfile->native_handle().fd, - aio_ctx.buffer, - aio_ctx.buffer_size, - aio_ctx.file_offset); - if (dsn_unlikely(ret < 0)) { - LOG_WARNING("write failed with errno={} and will retry it.", utils::safe_strerror(errno)); + rocksdb::Slice result; + auto s = aio_ctx.dfile->rfile()->Read( + aio_ctx.file_offset, aio_ctx.buffer_size, &result, (char *)(aio_ctx.buffer)); + if (!s.ok()) { + LOG_ERROR("read file failed, err = {}", s.ToString()); return ERR_FILE_OPERATION_FAILED; } - if (ret == 0) { + + if (result.empty()) { return ERR_HANDLE_EOF; } - *processed_bytes = static_cast(ret); + *processed_bytes = result.size(); return ERR_OK; } diff --git a/src/aio/native_linux_aio_provider.h b/src/aio/native_linux_aio_provider.h index bdb1339b9c..538b808dfa 100644 --- a/src/aio/native_linux_aio_provider.h +++ b/src/aio/native_linux_aio_provider.h @@ -27,11 +27,18 @@ #pragma once #include +#include +#include #include "aio/aio_task.h" #include "aio_provider.h" #include "utils/error_code.h" +namespace rocksdb { +class RandomAccessFile; +class RandomRWFile; +} // namespace rocksdb + namespace dsn { class disk_engine; @@ -41,16 +48,18 @@ class native_linux_aio_provider : public aio_provider explicit native_linux_aio_provider(disk_engine *disk); ~native_linux_aio_provider() override; - linux_fd_t open(const char *file_name, int flag, int pmode) override; - error_code close(linux_fd_t fd) override; - error_code flush(linux_fd_t fd) override; - error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override; + std::unique_ptr open_read_file(const std::string &fname) override; error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override; + std::unique_ptr open_write_file(const std::string &fname) override; + error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override; + error_code flush(rocksdb::RandomRWFile *wf) override; + error_code close(rocksdb::RandomRWFile *wf) override; + void submit_aio_task(aio_task *aio) override; aio_context *prepare_aio_context(aio_task *tsk) override { return new aio_context; } -protected: +private: error_code aio_internal(aio_task *aio); }; diff --git a/src/aio/test/CMakeLists.txt b/src/aio/test/CMakeLists.txt index c1b0a44e46..357499a9c8 100644 --- a/src/aio/test/CMakeLists.txt +++ b/src/aio/test/CMakeLists.txt @@ -33,7 +33,7 @@ set(MY_PROJ_SRC "") # "GLOB" for non-recursive search set(MY_SRC_SEARCH_MODE "GLOB") -set(MY_PROJ_LIBS gtest dsn_runtime dsn_aio rocksdb) +set(MY_PROJ_LIBS gtest dsn_runtime dsn_aio test_utils rocksdb) set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) diff --git a/src/aio/test/aio.cpp b/src/aio/test/aio.cpp index a95cbeb8f0..fa6b0114ae 100644 --- a/src/aio/test/aio.cpp +++ b/src/aio/test/aio.cpp @@ -24,7 +24,7 @@ * THE SOFTWARE. */ -#include +#include // IWYU pragma: no_include // IWYU pragma: no_include // IWYU pragma: no_include @@ -48,11 +48,25 @@ #include "utils/env.h" #include "utils/error_code.h" #include "utils/filesystem.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" -#include "utils/ports.h" #include "utils/test_macros.h" #include "utils/threadpool_code.h" +DSN_DEFINE_uint32(aio_test, + op_buffer_size, + 12, + "The buffer size of each aio read or write operation for the aio_test.basic"); +DSN_DEFINE_uint32(aio_test, + total_op_count, + 100, + "The total count of read or write operations for the aio_test.basic"); +DSN_DEFINE_uint32( + aio_test, + op_count_per_batch, + 10, + "The operation count of per read or write batch operation for the aio_test.basic"); + using namespace ::dsn; DEFINE_THREAD_POOL_CODE(THREAD_POOL_TEST_SERVER) @@ -62,6 +76,7 @@ class aio_test : public pegasus::encrypt_data_test_base { public: void SetUp() override { utils::filesystem::remove_path(kTestFileName); } + void TearDown() override { utils::filesystem::remove_path(kTestFileName); } const std::string kTestFileName = "aio_test.txt"; }; @@ -71,10 +86,10 @@ INSTANTIATE_TEST_CASE_P(, aio_test, ::testing::Values(false)); TEST_P(aio_test, basic) { - const char *kUnitBuffer = "hello, world"; - const size_t kUnitBufferLength = strlen(kUnitBuffer); - const int kTotalBufferCount = 100; - const int kBufferCountPerBatch = 10; + const size_t kUnitBufferLength = FLAGS_op_buffer_size; + const std::string kUnitBuffer(kUnitBufferLength, 'x'); + const int kTotalBufferCount = FLAGS_total_op_count; + const int kBufferCountPerBatch = FLAGS_op_count_per_batch; const int64_t kFileSize = kUnitBufferLength * kTotalBufferCount; ASSERT_EQ(0, kTotalBufferCount % kBufferCountPerBatch); @@ -86,15 +101,16 @@ TEST_P(aio_test, basic) auto verify_data = [=]() { int64_t file_size; ASSERT_TRUE(utils::filesystem::file_size( - kTestFileName.c_str(), dsn::utils::FileDataType::kSensitive, file_size)); + kTestFileName, dsn::utils::FileDataType::kSensitive, file_size)); ASSERT_EQ(kFileSize, file_size); // Create a read file handler. - auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0); + auto rfile = file::open(kTestFileName, file::FileOpenType::kReadOnly); ASSERT_NE(rfile, nullptr); // 1. Check sequential read. { + pegasus::stop_watch sw; uint64_t offset = 0; std::list tasks; for (int i = 0; i < kTotalBufferCount; i++) { @@ -111,12 +127,14 @@ TEST_P(aio_test, basic) t->wait(); ASSERT_EQ(kUnitBufferLength, t->get_transferred_size()); - ASSERT_STREQ(kUnitBuffer, read_buffer); + ASSERT_STREQ(kUnitBuffer.c_str(), read_buffer); } + sw.stop_and_output(fmt::format("sequential read")); } // 2. Check concurrent read. { + pegasus::stop_watch sw; uint64_t offset = 0; std::list tasks; char read_buffers[kTotalBufferCount][kUnitBufferLength + 1]; @@ -137,22 +155,24 @@ TEST_P(aio_test, basic) ASSERT_EQ(kUnitBufferLength, t->get_transferred_size()); } for (int i = 0; i < kTotalBufferCount; i++) { - ASSERT_STREQ(kUnitBuffer, read_buffers[i]); + ASSERT_STREQ(kUnitBuffer.c_str(), read_buffers[i]); } + sw.stop_and_output(fmt::format("concurrent read")); } ASSERT_EQ(ERR_OK, file::close(rfile)); }; // 1. Sequential write. { - auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + pegasus::stop_watch sw; + auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly); ASSERT_NE(wfile, nullptr); uint64_t offset = 0; std::list tasks; for (int i = 0; i < kTotalBufferCount; i++) { auto t = ::dsn::file::write(wfile, - kUnitBuffer, + kUnitBuffer.c_str(), kUnitBufferLength, offset, LPC_AIO_TEST, @@ -167,12 +187,14 @@ TEST_P(aio_test, basic) } ASSERT_EQ(ERR_OK, file::flush(wfile)); ASSERT_EQ(ERR_OK, file::close(wfile)); + sw.stop_and_output(fmt::format("sequential write")); } NO_FATALS(verify_data()); // 2. Un-sequential write. { - auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + pegasus::stop_watch sw; + auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly); ASSERT_NE(wfile, nullptr); std::vector offsets; @@ -188,7 +210,7 @@ TEST_P(aio_test, basic) std::list tasks; for (const auto &offset : offsets) { auto t = ::dsn::file::write(wfile, - kUnitBuffer, + kUnitBuffer.c_str(), kUnitBufferLength, offset, LPC_AIO_TEST, @@ -202,19 +224,21 @@ TEST_P(aio_test, basic) } ASSERT_EQ(ERR_OK, file::flush(wfile)); ASSERT_EQ(ERR_OK, file::close(wfile)); + sw.stop_and_output(fmt::format("un-sequential write")); } NO_FATALS(verify_data()); // 3. Overwrite. { - auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + pegasus::stop_watch sw; + auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly); ASSERT_NE(wfile, nullptr); uint64_t offset = 0; std::list tasks; for (int i = 0; i < kTotalBufferCount; i++) { auto t = ::dsn::file::write(wfile, - kUnitBuffer, + kUnitBuffer.c_str(), kUnitBufferLength, offset, LPC_AIO_TEST, @@ -229,19 +253,21 @@ TEST_P(aio_test, basic) } ASSERT_EQ(ERR_OK, file::flush(wfile)); ASSERT_EQ(ERR_OK, file::close(wfile)); + sw.stop_and_output(fmt::format("overwrite")); } NO_FATALS(verify_data()); // 4. Vector write. { - auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + pegasus::stop_watch sw; + auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly); ASSERT_NE(wfile, nullptr); uint64_t offset = 0; std::list tasks; std::unique_ptr buffers(new dsn_file_buffer_t[kBufferCountPerBatch]); for (int i = 0; i < kBufferCountPerBatch; i++) { - buffers[i].buffer = static_cast(const_cast(kUnitBuffer)); + buffers[i].buffer = static_cast(const_cast(kUnitBuffer.c_str())); buffers[i].size = kUnitBufferLength; } for (int i = 0; i < kTotalBufferCount / kBufferCountPerBatch; i++) { @@ -264,16 +290,17 @@ TEST_P(aio_test, basic) } ASSERT_EQ(ERR_OK, file::flush(wfile)); ASSERT_EQ(ERR_OK, file::close(wfile)); + sw.stop_and_output(fmt::format("vector write")); } NO_FATALS(verify_data()); } TEST_P(aio_test, aio_share) { - auto wfile = file::open(kTestFileName.c_str(), O_WRONLY | O_CREAT | O_BINARY, 0666); + auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly); ASSERT_NE(wfile, nullptr); - auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0); + auto rfile = file::open(kTestFileName, file::FileOpenType::kReadOnly); ASSERT_NE(rfile, nullptr); ASSERT_EQ(ERR_OK, file::close(wfile)); @@ -289,7 +316,7 @@ TEST_P(aio_test, operation_failed) *count = n; }; - auto wfile = file::open(kTestFileName.c_str(), O_WRONLY | O_CREAT | O_BINARY, 0666); + auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly); ASSERT_NE(wfile, nullptr); char buff[512] = {0}; @@ -305,7 +332,7 @@ TEST_P(aio_test, operation_failed) t->wait(); ASSERT_EQ(ERR_FILE_OPERATION_FAILED, *err); - auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0); + auto rfile = file::open(kTestFileName, file::FileOpenType::kReadOnly); ASSERT_NE(nullptr, rfile); t = ::dsn::file::read(rfile, buff, 512, 0, LPC_AIO_TEST, nullptr, io_callback, 0); @@ -357,9 +384,9 @@ TEST_P(aio_test, dsn_file) ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(src_file, src_file_md5)); ASSERT_FALSE(src_file_md5.empty()); - auto fin = file::open(src_file.c_str(), O_RDONLY | O_BINARY, 0); + auto fin = file::open(src_file, file::FileOpenType::kReadOnly); ASSERT_NE(nullptr, fin); - auto fout = file::open(dst_file.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666); + auto fout = file::open(dst_file, file::FileOpenType::kWriteOnly); ASSERT_NE(nullptr, fout); char kUnitBuffer[1024]; uint64_t offset = 0; diff --git a/src/aio/test/config.ini b/src/aio/test/config.ini index 47bc9cf7fb..fd46a38d75 100644 --- a/src/aio/test/config.ini +++ b/src/aio/test/config.ini @@ -43,3 +43,8 @@ tool = nativerun pause_on_start = false logging_start_level = LOG_LEVEL_DEBUG logging_factory_name = dsn::tools::simple_logger + +[aio_test] +op_buffer_size = 12 +total_op_count = 100 +op_count_per_batch = 10 diff --git a/src/meta/meta_state_service_simple.cpp b/src/meta/meta_state_service_simple.cpp index 6ba00176ea..776cfba342 100644 --- a/src/meta/meta_state_service_simple.cpp +++ b/src/meta/meta_state_service_simple.cpp @@ -26,7 +26,6 @@ #include "meta_state_service_simple.h" -#include #include #include #include @@ -44,7 +43,6 @@ #include "utils/binary_reader.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" -#include "utils/ports.h" #include "utils/strings.h" #include "utils/utils.h" @@ -314,7 +312,7 @@ error_code meta_state_service_simple::initialize(const std::vector } } - _log = file::open(log_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + _log = file::open(log_path, file::FileOpenType::kWriteOnly); if (!_log) { LOG_ERROR("open file failed: {}", log_path); return ERR_FILE_OPERATION_FAILED; diff --git a/src/nfs/nfs_client_impl.cpp b/src/nfs/nfs_client_impl.cpp index c0ced7b8c6..910bae8ec0 100644 --- a/src/nfs/nfs_client_impl.cpp +++ b/src/nfs/nfs_client_impl.cpp @@ -27,7 +27,6 @@ #include "nfs_client_impl.h" // IWYU pragma: no_include -#include #include #include "nfs/nfs_code_definition.h" @@ -38,7 +37,6 @@ #include "utils/filesystem.h" #include "utils/flags.h" #include "utils/fmt_logging.h" -#include "utils/ports.h" #include "utils/string_conv.h" #include "utils/token_buckets.h" @@ -460,8 +458,7 @@ void nfs_client_impl::continue_write() // double check zauto_lock l(fc->user_req->user_req_lock); if (!fc->file_holder->file_handle) { - fc->file_holder->file_handle = - file::open(file_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + fc->file_holder->file_handle = file::open(file_path, file::FileOpenType::kWriteOnly); } } @@ -470,6 +467,10 @@ void nfs_client_impl::continue_write() LOG_ERROR("open file {} failed", file_path); handle_completion(fc->user_req, ERR_FILE_OPERATION_FAILED); } else { + LOG_DEBUG("nfs: copy to file {} [{}, {}]", + file_path, + reqc->response.offset, + reqc->response.offset + reqc->response.size); zauto_lock l(reqc->lock); if (reqc->is_valid) { reqc->local_write_task = file::write(fc->file_holder->file_handle, diff --git a/src/nfs/nfs_server_impl.cpp b/src/nfs/nfs_server_impl.cpp index 08821042a4..c4bcf7a2e4 100644 --- a/src/nfs/nfs_server_impl.cpp +++ b/src/nfs/nfs_server_impl.cpp @@ -26,7 +26,6 @@ #include "nfs/nfs_server_impl.h" -#include #include #include #include @@ -41,7 +40,6 @@ #include "utils/env.h" #include "utils/filesystem.h" #include "utils/flags.h" -#include "utils/ports.h" #include "utils/string_conv.h" #include "utils/utils.h" @@ -93,7 +91,7 @@ void nfs_service_impl::on_copy(const ::dsn::service::copy_request &request, zauto_lock l(_handles_map_lock); auto it = _handles_map.find(file_path); // find file handle cache first if (it == _handles_map.end()) { - dfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0); + dfile = file::open(file_path, file::FileOpenType::kReadOnly); if (dfile == nullptr) { LOG_ERROR("[nfs_service] open file {} failed", file_path); ::dsn::service::copy_response resp; diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp b/src/replica/duplication/test/load_from_private_log_test.cpp index 2782f55f11..b215be60f0 100644 --- a/src/replica/duplication/test/load_from_private_log_test.cpp +++ b/src/replica/duplication/test/load_from_private_log_test.cpp @@ -16,8 +16,8 @@ // under the License. // IWYU pragma: no_include -#include #include +// IWYU pragma: no_include // IWYU pragma: no_include // IWYU pragma: no_include // IWYU pragma: no_include @@ -54,7 +54,6 @@ #include "utils/filesystem.h" #include "utils/flags.h" #include "utils/fmt_logging.h" -#include "utils/ports.h" #define BOOST_NO_CXX11_SCOPED_ENUMS #include @@ -459,7 +458,7 @@ TEST_F(load_fail_mode_test, fail_skip_real_corrupted_file) int64_t file_size; ASSERT_TRUE(utils::filesystem::file_size( log_path, dsn::utils::FileDataType::kSensitive, file_size)); - auto wfile = file::open(log_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + auto wfile = file::open(log_path, file::FileOpenType::kWriteOnly); ASSERT_NE(wfile, nullptr); const char buf[] = "xxxxxx"; diff --git a/src/replica/log_file.cpp b/src/replica/log_file.cpp index e239d4dae1..973213b3a4 100644 --- a/src/replica/log_file.cpp +++ b/src/replica/log_file.cpp @@ -26,7 +26,6 @@ #include "log_file.h" -#include #include #include #include @@ -99,7 +98,7 @@ log_file::~log_file() { close(); } return nullptr; } - disk_file *hfile = file::open(path, O_RDONLY | O_BINARY, 0); + disk_file *hfile = file::open(path, file::FileOpenType::kReadOnly); if (!hfile) { err = ERR_FILE_OPERATION_FAILED; LOG_WARNING("open log file {} failed", path); @@ -155,7 +154,7 @@ log_file::~log_file() { close(); } return nullptr; } - disk_file *hfile = file::open(path, O_RDWR | O_CREAT | O_BINARY, 0666); + disk_file *hfile = file::open(path, file::FileOpenType::kWriteOnly); if (!hfile) { LOG_WARNING("create log {} failed", path); return nullptr; @@ -268,6 +267,7 @@ aio_task_ptr log_file::commit_log_block(log_block &block, log_appender pending(offset, block); return commit_log_blocks(pending, evt, tracker, std::move(callback), hash); } + aio_task_ptr log_file::commit_log_blocks(log_appender &pending, dsn::task_code evt, dsn::task_tracker *tracker, @@ -333,7 +333,7 @@ aio_task_ptr log_file::commit_log_blocks(log_appender &pending, hash); } - if (utils::FLAGS_enable_latency_tracer) { + if (dsn_unlikely(utils::FLAGS_enable_latency_tracer)) { tsk->_tracer->set_parent_point_name("commit_pending_mutations"); tsk->_tracer->set_description("log"); for (const auto &mutation : pending.mutations()) { diff --git a/src/replica/storage/simple_kv/simple_kv.server.impl.cpp b/src/replica/storage/simple_kv/simple_kv.server.impl.cpp index 81a718a45f..e78ae81efb 100644 --- a/src/replica/storage/simple_kv/simple_kv.server.impl.cpp +++ b/src/replica/storage/simple_kv/simple_kv.server.impl.cpp @@ -35,7 +35,6 @@ #include "simple_kv.server.impl.h" -#include #include #include #include @@ -61,7 +60,6 @@ #include "utils/blob.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" -#include "utils/ports.h" #include "utils/utils.h" namespace dsn { @@ -249,7 +247,7 @@ ::dsn::error_code simple_kv_service_impl::sync_checkpoint() return ERR_OK; } - auto wfile = file::open(fname.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + auto wfile = file::open(fname, file::FileOpenType::kWriteOnly); CHECK_NOTNULL(wfile, ""); #define WRITE_DATA_SIZE(data, size) \ diff --git a/src/replica/storage/simple_kv/test/simple_kv.server.impl.cpp b/src/replica/storage/simple_kv/test/simple_kv.server.impl.cpp index ae04a7f4e6..021496194b 100644 --- a/src/replica/storage/simple_kv/test/simple_kv.server.impl.cpp +++ b/src/replica/storage/simple_kv/test/simple_kv.server.impl.cpp @@ -25,7 +25,6 @@ */ #include "simple_kv.server.impl.h" -#include #include #include #include @@ -49,7 +48,6 @@ #include "utils/blob.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" -#include "utils/ports.h" #include "utils/threadpool_code.h" #include "utils/utils.h" @@ -254,7 +252,7 @@ ::dsn::error_code simple_kv_service_impl::sync_checkpoint() } std::string fname = fmt::format("{}/checkpoint.{}", data_dir(), last_commit); - auto wfile = file::open(fname.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + auto wfile = file::open(fname, file::FileOpenType::kWriteOnly); CHECK_NOTNULL(wfile, ""); #define WRITE_DATA_SIZE(data, size) \ diff --git a/src/replica/test/mutation_log_test.cpp b/src/replica/test/mutation_log_test.cpp index 980a538bf1..4f96d7d57e 100644 --- a/src/replica/test/mutation_log_test.cpp +++ b/src/replica/test/mutation_log_test.cpp @@ -26,7 +26,6 @@ #include "replica/mutation_log.h" -#include // IWYU pragma: no_include // IWYU pragma: no_include // IWYU pragma: no_include @@ -70,7 +69,7 @@ using namespace ::dsn::replication; static void overwrite_file(const char *file, int offset, const void *buf, int size) { - auto wfile = file::open(file, O_RDWR | O_CREAT | O_BINARY, 0666); + auto wfile = file::open(file, file::FileOpenType::kWriteOnly); ASSERT_NE(wfile, nullptr); auto t = ::dsn::file::write(wfile, (const char *)buf, diff --git a/src/runtime/test/CMakeLists.txt b/src/runtime/test/CMakeLists.txt index c0146eb73b..01f17e8c50 100644 --- a/src/runtime/test/CMakeLists.txt +++ b/src/runtime/test/CMakeLists.txt @@ -33,6 +33,7 @@ set(MY_PROJ_LIBS gtest dsn_runtime dsn_aio dsn_meta_server + rocksdb ) set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) diff --git a/src/runtime/test/task_test.cpp b/src/runtime/test/task_test.cpp index ba4f3d8875..5716c8ce41 100644 --- a/src/runtime/test/task_test.cpp +++ b/src/runtime/test/task_test.cpp @@ -17,7 +17,6 @@ #include "runtime/task/task.h" -#include // IWYU pragma: no_include // IWYU pragma: no_include #include @@ -66,7 +65,7 @@ class task_test : public ::testing::Test static void test_signal_finished_task() { - disk_file *fp = file::open("config-test.ini", O_RDONLY | O_BINARY, 0); + disk_file *fp = file::open("config-test.ini", file::FileOpenType::kReadOnly); // this aio task is enqueued into read-queue of disk_engine char buffer[128]; @@ -80,6 +79,7 @@ class task_test : public ::testing::Test // signal a finished task won't cause failure t->signal_waiters(); // signal_waiters may return false t->signal_waiters(); + ASSERT_EQ(ERR_OK, file::close(fp)); } }; diff --git a/src/test_util/test_util.cpp b/src/test_util/test_util.cpp index 0789c4678b..935d50631a 100644 --- a/src/test_util/test_util.cpp +++ b/src/test_util/test_util.cpp @@ -18,7 +18,6 @@ #include "test_util.h" #include -#include #include #include #include diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h index debeb7fa90..23b1624be7 100644 --- a/src/test_util/test_util.h +++ b/src/test_util/test_util.h @@ -19,10 +19,15 @@ #pragma once -#include +#include +#include +#include #include +#include #include +#include "fmt/core.h" +#include "runtime/api_layer1.h" #include "utils/flags.h" #include "utils/test_macros.h" @@ -43,6 +48,23 @@ class encrypt_data_test_base : public testing::TestWithParam encrypt_data_test_base() { FLAGS_encrypt_data_at_rest = GetParam(); } }; +class stop_watch +{ +public: + stop_watch() { _start_ms = dsn_now_ms(); } + void stop_and_output(const std::string &msg) + { + auto duration_ms = + std::chrono::duration_cast>( + std::chrono::milliseconds(static_cast(dsn_now_ms() - _start_ms))) + .count(); + fmt::print(stdout, "{}, cost {} ms\n", msg, duration_ms); + } + +private: + uint64_t _start_ms = 0; +}; + void create_local_test_file(const std::string &full_name, dsn::replication::file_meta *fm); #define ASSERT_EVENTUALLY(expr) \ diff --git a/src/utils/long_adder_bench/long_adder_bench.cpp b/src/utils/long_adder_bench/long_adder_bench.cpp index 93ec649c7b..0c2a12deed 100644 --- a/src/utils/long_adder_bench/long_adder_bench.cpp +++ b/src/utils/long_adder_bench/long_adder_bench.cpp @@ -20,12 +20,11 @@ #include #include #include -#include #include #include #include -#include "runtime/api_layer1.h" +#include "test_util/test_util.h" #include "utils/long_adder.h" #include "utils/ports.h" #include "utils/process_utils.h" @@ -133,7 +132,7 @@ void run_bench(int64_t num_operations, int64_t num_threads, const char *name) std::vector threads; - uint64_t start = dsn_now_ns(); + pegasus::stop_watch sw; for (int64_t i = 0; i < num_threads; i++) { threads.emplace_back([num_operations, &adder]() { for (int64_t i = 0; i < num_operations; ++i) { @@ -144,19 +143,11 @@ void run_bench(int64_t num_operations, int64_t num_threads, const char *name) for (auto &t : threads) { t.join(); } - uint64_t end = dsn_now_ns(); - - auto duration_ns = static_cast(end - start); - std::chrono::nanoseconds nano(duration_ns); - auto duration_s = std::chrono::duration_cast>(nano).count(); - - fmt::print(stdout, - "Running {} operations of {} with {} threads took {} seconds, result = {}.\n", - num_operations, - name, - num_threads, - duration_s, - adder.value()); + sw.stop_and_output(fmt::format("Running {} operations of {} with {} threads, result = {}", + num_operations, + name, + num_threads, + adder.value())); } int main(int argc, char **argv) diff --git a/src/utils/test/env.cpp b/src/utils/test/env.cpp index 813465de26..49e389729f 100644 --- a/src/utils/test/env.cpp +++ b/src/utils/test/env.cpp @@ -47,8 +47,8 @@ #include #include "test_util/test_util.h" -#include "utils/enum_helper.h" #include "utils/env.h" +#include "utils/error_code.h" #include "utils/filesystem.h" #include "utils/flags.h" #include "utils/rand.h"