Skip to content

Commit

Permalink
feature(aio): use rocksdb APIs to re-implement the aio module
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Oct 12, 2023
1 parent ed3d31e commit cf48079
Show file tree
Hide file tree
Showing 24 changed files with 263 additions and 156 deletions.
2 changes: 1 addition & 1 deletion src/aio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ set(MY_PROJ_SRC "")
#"GLOB" for non - recursive search
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_LIBS dsn_runtime)
set(MY_PROJ_LIBS dsn_runtime rocksdb)

#Extra files that will be installed
set(MY_BINPLACES "")
Expand Down
16 changes: 12 additions & 4 deletions src/aio/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@
#pragma once

#include <stdint.h>
#include <memory>
#include <string>

#include "utils/error_code.h"
#include "utils/factory_store.h"

namespace rocksdb {
class RandomAccessFile;
class RandomRWFile;
} // namespace rocksdb

namespace dsn {

class aio_context;
Expand Down Expand Up @@ -60,12 +67,13 @@ class aio_provider
explicit aio_provider(disk_engine *disk);
virtual ~aio_provider() = default;

virtual linux_fd_t open(const char *file_name, int flag, int pmode) = 0;
virtual std::unique_ptr<rocksdb::RandomAccessFile> open_read_file(const std::string &fname) = 0;
virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0;

virtual error_code close(linux_fd_t fd) = 0;
virtual error_code flush(linux_fd_t fd) = 0;
virtual std::unique_ptr<rocksdb::RandomRWFile> open_write_file(const std::string &fname) = 0;
virtual error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0;
virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0;
virtual error_code flush(rocksdb::RandomRWFile *rwf) = 0;
virtual error_code close(rocksdb::RandomRWFile *rwf) = 0;

// Submits the aio_task to the underlying disk-io executor.
// This task may not be executed immediately, call `aio_task::wait`
Expand Down
7 changes: 6 additions & 1 deletion src/aio/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,26 @@ aio_task *disk_write_queue::unlink_next_workload(void *plength)
return first;
}

disk_file::disk_file(linux_fd_t fd) : _fd(fd) {}
disk_file::disk_file(std::unique_ptr<rocksdb::RandomAccessFile> rf) : _read_file(std::move(rf)) {}
disk_file::disk_file(std::unique_ptr<rocksdb::RandomRWFile> wf) : _write_file(std::move(wf)) {}

aio_task *disk_file::read(aio_task *tsk)
{
CHECK(_read_file, "");
tsk->add_ref(); // release on completion, see `on_read_completed`.
return _read_queue.add_work(tsk, nullptr);
}

aio_task *disk_file::write(aio_task *tsk, void *ctx)
{
CHECK(_write_file, "");
tsk->add_ref(); // release on completion
return _write_queue.add_work(tsk, ctx);
}

aio_task *disk_file::on_read_completed(aio_task *wk, error_code err, size_t size)
{
CHECK(_read_file, "");
CHECK(wk->next == nullptr, "");
auto ret = _read_queue.on_work_completed(wk, nullptr);
wk->enqueue(err, size);
Expand All @@ -128,6 +132,7 @@ aio_task *disk_file::on_read_completed(aio_task *wk, error_code err, size_t size

aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err, size_t size)
{
CHECK(_write_file, "");
auto ret = _write_queue.on_work_completed(wk, ctx);

while (wk) {
Expand Down
11 changes: 8 additions & 3 deletions src/aio/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "aio/aio_task.h"
#include "aio_provider.h"
#include "rocksdb/env.h"
#include "utils/singleton.h"
#include "utils/work_queue.h"

Expand All @@ -56,17 +57,21 @@ class disk_write_queue : public work_queue<aio_task>
class disk_file
{
public:
explicit disk_file(linux_fd_t fd);
explicit disk_file(std::unique_ptr<rocksdb::RandomAccessFile> rf);
explicit disk_file(std::unique_ptr<rocksdb::RandomRWFile> wf);
aio_task *read(aio_task *tsk);
aio_task *write(aio_task *tsk, void *ctx);

aio_task *on_read_completed(aio_task *wk, error_code err, size_t size);
aio_task *on_write_completed(aio_task *wk, void *ctx, error_code err, size_t size);

linux_fd_t native_handle() const { return _fd; }
rocksdb::RandomAccessFile *rfile() const { return _read_file.get(); }
rocksdb::RandomRWFile *wfile() const { return _write_file.get(); }

private:
linux_fd_t _fd;
// TODO(yingchun): unify to use a single RandomRWFile member variable.
std::unique_ptr<rocksdb::RandomAccessFile> _read_file;
std::unique_ptr<rocksdb::RandomRWFile> _write_file;
disk_write_queue _write_queue;
work_queue<aio_task> _read_queue;
};
Expand Down
48 changes: 36 additions & 12 deletions src/aio/file_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,51 @@

#include "aio/file_io.h"

#include <memory>
// IWYU pragma: no_include <algorithm>
#include <vector>

#include "aio/aio_provider.h"
#include "disk_engine.h"
#include "rocksdb/env.h"
#include "utils/fmt_logging.h"

namespace dsn {
class task_tracker;

namespace file {

/*extern*/ disk_file *open(const char *file_name, int flag, int pmode)
/*extern*/ disk_file *open(const std::string &fname, FileOpenType type)
{
auto fd = disk_engine::provider().open(file_name, flag, pmode);
if (fd.is_invalid()) {
return nullptr;
switch (type) {
case FileOpenType::kReadOnly: {
auto sf = disk_engine::provider().open_read_file(fname);
if (!sf) {
return nullptr;
}
return new disk_file(std::move(sf));
}

return new disk_file(fd);
case FileOpenType::kWriteOnly: {
auto wf = disk_engine::provider().open_write_file(fname);
if (!wf) {
return nullptr;
}
return new disk_file(std::move(wf));
}
default:
CHECK(false, "");
}
return nullptr;
}

/*extern*/ error_code close(disk_file *file)
{
error_code result = ERR_INVALID_HANDLE;
error_code result = ERR_OK;
if (file != nullptr) {
result = disk_engine::provider().close(file->native_handle());
// A read file is not needed to close.
if (file->wfile()) {
result = disk_engine::provider().close(file->wfile());
}
delete file;
file = nullptr;
}
Expand All @@ -60,11 +79,11 @@ namespace file {

/*extern*/ error_code flush(disk_file *file)
{
if (nullptr != file) {
return disk_engine::provider().flush(file->native_handle());
} else {
if (file == nullptr || file->wfile() == nullptr) {
return ERR_INVALID_HANDLE;
}

return disk_engine::provider().flush(file->wfile());
}

/*extern*/ aio_task_ptr read(disk_file *file,
Expand All @@ -84,7 +103,8 @@ namespace file {
cb->get_aio_context()->engine = &disk_engine::instance();
cb->get_aio_context()->dfile = file;

if (!cb->spec().on_aio_call.execute(task::get_current_task(), cb, true)) {
if (!cb->spec().on_aio_call.execute(task::get_current_task(), cb, true) ||
file->rfile() == nullptr) {
cb->enqueue(ERR_FILE_OPERATION_FAILED, 0);
return cb;
}
Expand All @@ -110,6 +130,10 @@ namespace file {
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;
cb->get_aio_context()->dfile = file;
if (file->wfile() == nullptr) {
cb->enqueue(ERR_FILE_OPERATION_FAILED, 0);
return cb;
}

disk_engine::instance().write(cb);
return cb;
Expand Down
10 changes: 9 additions & 1 deletion src/aio/file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <stdint.h>
#include <list>
#include <string>
#include <utility>

#include "aio/aio_task.h"
Expand All @@ -47,6 +48,13 @@ class task_tracker;

namespace file {

enum class FileOpenType
{
kReadOnly = 0,
kWriteOnly
};

// TODO(yingchun): consider to return a smart pointer
/// open file
///
/// \param file_name filename of the file.
Expand All @@ -55,7 +63,7 @@ namespace file {
///
/// \return file handle
///
extern disk_file *open(const char *file_name, int flag, int pmode);
extern disk_file *open(const std::string &fname, FileOpenType type);

/// close the file handle
extern error_code close(disk_file *file);
Expand Down
Loading

0 comments on commit cf48079

Please sign in to comment.