Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: make aio decoupled from dsn_runtime #466

Merged
merged 34 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/dsn/tool-api/file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,7 @@ extern aio_task_ptr write_vector(disk_file *file,
aio_handler &&callback,
int hash = 0);

extern aio_context_ptr prepare_aio_context(aio_task *tsk);

} // namespace file
} // namespace dsn
4 changes: 2 additions & 2 deletions include/dsn/tool-api/global_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ struct service_spec
bool enable_default_app_mimic;

std::string timer_factory_name;
std::string aio_factory_name;
std::string env_factory_name;
std::string lock_factory_name;
std::string lock_nr_factory_name;
Expand Down Expand Up @@ -189,7 +188,6 @@ CONFIG_FLD(
"; in this case, a [apps.mimic] section must be defined in config files");

CONFIG_FLD_STRING(timer_factory_name, "", "timer service provider")
CONFIG_FLD_STRING(aio_factory_name, "", "asynchonous file system provider")
CONFIG_FLD_STRING(env_factory_name, "", "environment provider")
CONFIG_FLD_STRING(lock_factory_name, "", "recursive exclusive lock provider")
CONFIG_FLD_STRING(lock_nr_factory_name, "", "non-recurisve exclusive lock provider")
Expand All @@ -213,4 +211,6 @@ ENUM_REG(SYS_EXIT_NORMAL)
ENUM_REG(SYS_EXIT_BREAK)
ENUM_REG(SYS_EXIT_EXCEPTION)
ENUM_END(sys_exit_type)

extern const char *FLAGS_aio_factory_name;
} // namespace dsn
11 changes: 2 additions & 9 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ struct __tls_dsn__
int node_id;

rpc_engine *rpc;
disk_engine *disk;
env_provider *env;

int last_worker_queue_size;
Expand Down Expand Up @@ -248,7 +247,6 @@ class task : public ref_counter, public extensible_object<task, 4>, public trans
static int get_current_worker_index();
static const char *get_current_node_name();
static rpc_engine *get_current_rpc();
static disk_engine *get_current_disk();
static env_provider *get_current_env();

static void set_tls_dsn_context(
Expand Down Expand Up @@ -572,6 +570,7 @@ class aio_context : public ref_counter
{
}
};
typedef dsn::ref_ptr<aio_context> aio_context_ptr;

class aio_task : public task
{
Expand Down Expand Up @@ -608,7 +607,7 @@ class aio_task : public task
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

private:
dsn::ref_ptr<aio_context> _aio_ctx;
aio_context_ptr _aio_ctx;
size_t _transferred_size;
aio_handler _cb;
};
Expand Down Expand Up @@ -677,12 +676,6 @@ __inline /*static*/ rpc_engine *task::get_current_rpc()
return tls_dsn.rpc;
}

__inline /*static*/ disk_engine *task::get_current_disk()
{
check_tls_dsn();
return tls_dsn.disk;
}

__inline /*static*/ env_provider *task::get_current_env()
{
check_tls_dsn();
Expand Down
3 changes: 0 additions & 3 deletions include/dsn/tool_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ Component providers define the interface for the local components (e.g., network
#include <dsn/tool-api/task_worker.h>
#include <dsn/tool-api/admission_controller.h>
#include <dsn/tool-api/network.h>
#include <dsn/tool-api/aio_provider.h>
#include <dsn/tool-api/env_provider.h>
#include <dsn/tool-api/message_parser.h>
#include <dsn/tool-api/logging_provider.h>
Expand Down Expand Up @@ -140,8 +139,6 @@ DSN_API bool register_component_provider(const char *name,
DSN_API bool
register_component_provider(const char *name, network::factory f, ::dsn::provider_type type);
DSN_API bool
register_component_provider(const char *name, aio_provider::factory f, ::dsn::provider_type type);
DSN_API bool
register_component_provider(const char *name, env_provider::factory f, ::dsn::provider_type type);
DSN_API bool register_component_provider(const char *name,
logging_provider::factory f,
Expand Down
1 change: 1 addition & 0 deletions src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ add_subdirectory(core)
add_subdirectory(perf_counter)
add_subdirectory(tools)
add_subdirectory(tests)
add_subdirectory(aio)

add_library(dsn_runtime STATIC
$<TARGET_OBJECTS:dsn.core>
Expand Down
17 changes: 17 additions & 0 deletions src/core/aio/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
set(MY_PROJ_NAME dsn_aio)

#Source files under CURRENT project directory will be automatically included.
#You can manually set MY_PROJ_SRC to include source files under other directories.
set(MY_PROJ_SRC "")

#Search mode for source files under CURRENT project directory ?
#"GLOB_RECURSE" for recursive search
#"GLOB" for non - recursive search
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_LIBS dsn_runtime)

#Extra files that will be installed
set(MY_BINPLACES "")

dsn_add_static_library()
13 changes: 2 additions & 11 deletions src/core/core/aio_provider.cpp → src/core/aio/aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,7 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <dsn/tool-api/aio_provider.h>
#include "aio_provider.h"
#include "disk_engine.h"

namespace dsn {
Expand All @@ -50,4 +41,4 @@ void aio_provider::complete_io(aio_task *aio,
_engine->complete_io(aio, err, bytes, delay_milliseconds);
}

} // end namespace dsn
} // namespace dsn
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ class aio_provider

virtual aio_context *prepare_aio_context(aio_task *) = 0;

virtual void start() = 0;

protected:
DSN_API void
complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);
Expand Down
57 changes: 57 additions & 0 deletions src/core/aio/aio_task.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include "core/core/task_engine.h"
#include <dsn/tool-api/file_io.h>
#include <dsn/utility/error_code.h>

namespace dsn {

aio_task::aio_task(dsn::task_code code, const aio_handler &cb, int hash, service_node *node)
: aio_task(code, aio_handler(cb), hash, node)
{
}

aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node *node)
: task(code, hash, node), _cb(std::move(cb))
{
_is_null = (_cb == nullptr);

dassert(TASK_TYPE_AIO == spec().type,
"%s is not of AIO type, please use DEFINE_TASK_CODE_AIO to define the task code",
spec().name.c_str());
set_error_code(ERR_IO_PENDING);

_aio_ctx = file::prepare_aio_context(this);
}

void aio_task::collapse()
{
if (!_unmerged_write_buffers.empty()) {
std::shared_ptr<char> buffer(dsn::utils::make_shared_array<char>(_aio_ctx->buffer_size));
char *dest = buffer.get();
for (const dsn_file_buffer_t &b : _unmerged_write_buffers) {
::memcpy(dest, b.buffer, b.size);
dest += b.size;
}
dassert(dest - buffer.get() == _aio_ctx->buffer_size,
"%u VS %u",
dest - buffer.get(),
_aio_ctx->buffer_size);
_aio_ctx->buffer = buffer.get();
_merged_write_buffer_holder.assign(std::move(buffer), 0, _aio_ctx->buffer_size);
}
}

void aio_task::enqueue(error_code err, size_t transferred_size)
{
set_error_code(err);
_transferred_size = transferred_size;

spec().on_aio_enqueue.execute(this);

task::enqueue(node()->computation()->get_pool(spec().pool_code));
}

} // namespace dsn
36 changes: 11 additions & 25 deletions src/core/core/disk_engine.cpp → src/core/aio/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/

#include "disk_engine.h"
#include "sim_aio_provider.h"
#include "core/core/service_engine.h"

using namespace dsn::utils;

Expand Down Expand Up @@ -132,25 +134,20 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err,
}

//----------------- disk_engine ------------------------
disk_engine::disk_engine(service_node *node)
disk_engine::disk_engine()
{
_is_running = false;
_provider = nullptr;
_node = node;
_node = service_engine::instance().get_all_nodes().begin()->second.get();

// use native_linux_aio_provider in default
if (!strcmp(FLAGS_aio_factory_name, "dsn::tools::sim_aio_provider")) {
_provider.reset(new aio::sim_aio_provider(this, nullptr));
} else {
_provider.reset(new native_linux_aio_provider(this, nullptr));
}
}

disk_engine::~disk_engine() {}

void disk_engine::start(aio_provider *provider)
{
if (_is_running)
return;

_provider = provider;
_provider->start();
_is_running = true;
}

disk_file *disk_engine::open(const char *file_name, int flag, int pmode)
{
dsn_handle_t nh = _provider->open(file_name, flag, pmode);
Expand Down Expand Up @@ -185,11 +182,6 @@ error_code disk_engine::flush(disk_file *fh)

void disk_engine::read(aio_task *aio)
{
if (!_is_running) {
aio->enqueue(ERR_SERVICE_NOT_FOUND, 0);
return;
}

if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) {
aio->enqueue(ERR_FILE_OPERATION_FAILED, 0);
return;
Expand Down Expand Up @@ -233,11 +225,6 @@ class batch_write_io_task : public aio_task

void disk_engine::write(aio_task *aio)
{
if (!_is_running) {
aio->enqueue(ERR_SERVICE_NOT_FOUND, 0);
return;
}

if (!aio->spec().on_aio_call.execute(task::get_current_task(), aio, true)) {
aio->enqueue(ERR_FILE_OPERATION_FAILED, 0);
return;
Expand Down Expand Up @@ -342,5 +329,4 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes, int
}
}
}

} // namespace dsn
25 changes: 11 additions & 14 deletions src/core/core/disk_engine.h → src/core/aio/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@

#pragma once

#include "service_engine.h"
#include "aio_provider.h"

#include <dsn/utility/synchronize.h>
#include <dsn/tool-api/aio_provider.h>
#include <dsn/utility/work_queue.h>

namespace dsn {
Expand Down Expand Up @@ -68,14 +67,9 @@ class disk_file
work_queue<aio_task> _read_queue;
};

class disk_engine
class disk_engine : public utils::singleton<disk_engine>
{
public:
disk_engine(service_node *node);
~disk_engine();

void start(aio_provider *provider);

// asynchonous file read/write
disk_file *open(const char *file_name, int flag, int pmode);
error_code close(disk_file *fh);
Expand All @@ -84,19 +78,22 @@ class disk_engine
void write(aio_task *aio);

aio_context *prepare_aio_context(aio_task *tsk) { return _provider->prepare_aio_context(tsk); }

service_node *node() const { return _node; }

private:
friend class aio_provider;
friend class batch_write_io_task;
// the object of disk_engine must be created by `singleton::instance`
disk_engine();
~disk_engine();

void process_write(aio_task *wk, uint32_t sz);
void complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);

private:
volatile bool _is_running;
aio_provider *_provider;
std::unique_ptr<aio_provider> _provider;
service_node *_node;

friend class aio_provider;
friend class batch_write_io_task;
friend class utils::singleton<disk_engine>;
};

} // namespace dsn
Loading