Skip to content

Commit

Permalink
perf: optimizing write latency using independent IO queues replace of…
Browse files Browse the repository at this point in the history
… libaio (#633)
  • Loading branch information
foreverneverer authored Oct 19, 2020
1 parent 2c7148d commit a975bd6
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 182 deletions.
4 changes: 2 additions & 2 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ MAKE_EVENT_CODE_RPC(RPC_TEST_AGENT_WRITE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_TEST_AGENT_READ, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_AIO(LPC_AIO_TEST, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_AIO(LPC_AIO_IMMEDIATE_CALLBACK, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_COMMON, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_PRIVATE, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_QUERY_CONFIGURATION_ALL, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_MEM_RELEASE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON)
Expand Down Expand Up @@ -183,6 +182,7 @@ MAKE_EVENT_CODE(LPC_BACKGROUND_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_LOW, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_HIGH, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_PRIVATE, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

#define CURRENT_THREAD_POOL THREAD_POOL_SLOG
Expand Down
6 changes: 3 additions & 3 deletions src/aio/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class aio_provider

virtual error_code close(dsn_handle_t fh) = 0;
virtual error_code flush(dsn_handle_t fh) = 0;
virtual error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0;
virtual error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0;

// Submits the aio_task to the underlying disk-io executor.
// This task may not be executed immediately, call `aio_task::wait`
Expand All @@ -69,9 +71,7 @@ class aio_provider

virtual aio_context *prepare_aio_context(aio_task *) = 0;

protected:
DSN_API void
complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);
void complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);

private:
disk_engine *_engine;
Expand Down
1 change: 1 addition & 0 deletions src/aio/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "aio_provider.h"

#include <dsn/tool_api.h>
#include <dsn/utility/synchronize.h>
#include <dsn/utility/work_queue.h>

Expand Down
186 changes: 47 additions & 139 deletions src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,13 @@

#include "native_linux_aio_provider.h"

#include <fcntl.h>
#include <cstdlib>
#include <dsn/tool-api/async_calls.h>

namespace dsn {

native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_provider(disk)
{
memset(&_ctx, 0, sizeof(_ctx));
auto ret = io_setup(128, &_ctx); // 128 concurrent events
dassert(ret == 0, "io_setup error, ret = %d", ret);

_is_running = true;
_worker = std::thread([this]() {
task::set_tls_dsn_context(node(), nullptr);
get_event();
});
}

native_linux_aio_provider::~native_linux_aio_provider()
{
if (!_is_running) {
return;
}
_is_running = false;

auto ret = io_destroy(_ctx);
dassert(ret == 0, "io_destroy error, ret = %d", ret);
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_provider(disk) {}

_worker.join();
}
native_linux_aio_provider::~native_linux_aio_provider() {}

dsn_handle_t native_linux_aio_provider::open(const char *file_name, int flag, int pmode)
{
Expand Down Expand Up @@ -86,144 +63,75 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh)
}
}

aio_context *native_linux_aio_provider::prepare_aio_context(aio_task *tsk)
error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
/*out*/ uint32_t *processed_bytes)
{
return new linux_disk_aio_context(tsk);
ssize_t ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
aio_ctx.buffer,
aio_ctx.buffer_size,
aio_ctx.file_offset);
if (ret < 0) {
return ERR_FILE_OPERATION_FAILED;
}
*processed_bytes = static_cast<uint32_t>(ret);
return ERR_OK;
}

void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk) { aio_internal(aio_tsk, true); }

void native_linux_aio_provider::get_event()
error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
/*out*/ uint32_t *processed_bytes)
{
struct io_event events[1];
int ret;

task::set_tls_dsn_context(node(), nullptr);

const char *name = ::dsn::tools::get_service_node_name(node());
char buffer[128];
sprintf(buffer, "%s.aio", name);
task_worker::set_name(buffer);

while (true) {
if (dsn_unlikely(!_is_running.load(std::memory_order_relaxed))) {
break;
}
ret = io_getevents(_ctx, 1, 1, events, NULL);
if (ret > 0) // should be 1
{
dassert(ret == 1, "io_getevents returns %d", ret);
struct iocb *io = events[0].obj;
complete_aio(io, static_cast<int>(events[0].res), static_cast<int>(events[0].res2));
} else {
// on error it returns a negated error number (the negative of one of the values listed
// in ERRORS
dwarn("io_getevents returns %d, you probably want to try on another machine:-(", ret);
}
ssize_t ret = pread(static_cast<int>((ssize_t)aio_ctx.file),
aio_ctx.buffer,
aio_ctx.buffer_size,
aio_ctx.file_offset);
if (ret < 0) {
return ERR_FILE_OPERATION_FAILED;
}
if (ret == 0) {
return ERR_HANDLE_EOF;
}
*processed_bytes = static_cast<uint32_t>(ret);
return ERR_OK;
}

void native_linux_aio_provider::complete_aio(struct iocb *io, int bytes, int err)
void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk)
{
linux_disk_aio_context *aio = CONTAINING_RECORD(io, linux_disk_aio_context, cb);
error_code ec;
if (err != 0) {
derror("aio error, err = %s", strerror(err));
ec = ERR_FILE_OPERATION_FAILED;
} else {
ec = bytes > 0 ? ERR_OK : ERR_HANDLE_EOF;
}

if (!aio->evt) {
aio_task *aio_ptr(aio->tsk);
aio->this_->complete_io(aio_ptr, ec, bytes);
} else {
aio->err = ec;
aio->bytes = bytes;
aio->evt->notify();
}
tasking::enqueue(aio_tsk->code(),
aio_tsk->tracker(),
[=]() { aio_internal(aio_tsk, true); },
aio_tsk->hash());
}

error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk,
bool async,
/*out*/ uint32_t *pbytes /*= nullptr*/)
{
struct iocb *cbs[1];
linux_disk_aio_context *aio;
int ret;

aio = (linux_disk_aio_context *)aio_tsk->get_aio_context();

memset(&aio->cb, 0, sizeof(aio->cb));

aio->this_ = this;

switch (aio->type) {
aio_context *aio_ctx = aio_tsk->get_aio_context();
error_code err = ERR_UNKNOWN;
uint32_t processed_bytes = 0;
switch (aio_ctx->type) {
case AIO_Read:
io_prep_pread(&aio->cb,
static_cast<int>((ssize_t)aio->file),
aio->buffer,
aio->buffer_size,
aio->file_offset);
err = read(*aio_ctx, &processed_bytes);
break;
case AIO_Write:
if (aio->buffer) {
io_prep_pwrite(&aio->cb,
static_cast<int>((ssize_t)aio->file),
aio->buffer,
aio->buffer_size,
aio->file_offset);
} else {
int iovcnt = aio->write_buffer_vec->size();
struct iovec *iov = (struct iovec *)alloca(sizeof(struct iovec) * iovcnt);
for (int i = 0; i < iovcnt; i++) {
const dsn_file_buffer_t &buf = aio->write_buffer_vec->at(i);
iov[i].iov_base = buf.buffer;
iov[i].iov_len = buf.size;
}
io_prep_pwritev(
&aio->cb, static_cast<int>((ssize_t)aio->file), iov, iovcnt, aio->file_offset);
}
err = write(*aio_ctx, &processed_bytes);
break;
default:
derror("unknown aio type %u", static_cast<int>(aio->type));
return err;
}

if (!async) {
aio->evt = new utils::notify_event();
aio->err = ERR_OK;
aio->bytes = 0;
if (pbytes) {
*pbytes = processed_bytes;
}

cbs[0] = &aio->cb;
ret = io_submit(_ctx, 1, cbs);

if (ret != 1) {
if (ret < 0)
derror("io_submit error, ret = %d", ret);
else
derror("could not sumbit IOs, ret = %d", ret);

if (async) {
complete_io(aio_tsk, ERR_FILE_OPERATION_FAILED, 0);
} else {
delete aio->evt;
aio->evt = nullptr;
}
return ERR_FILE_OPERATION_FAILED;
if (async) {
complete_io(aio_tsk, err, processed_bytes);
} else {
if (async) {
return ERR_IO_PENDING;
} else {
aio->evt->wait();
delete aio->evt;
aio->evt = nullptr;
if (pbytes != nullptr) {
*pbytes = aio->bytes;
}
return aio->err;
}
utils::notify_event notify;
notify.notify();
}

return err;
}

} // namespace dsn
38 changes: 4 additions & 34 deletions src/aio/native_linux_aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@

#include "aio_provider.h"

#include <dsn/tool_api.h>
#include <dsn/utility/synchronize.h>
#include <queue>
#include <stdio.h> /* for perror() */
#include <sys/syscall.h> /* for __NR_* definitions */
#include <libaio.h>
#include <fcntl.h> /* O_RDWR */
#include <string.h> /* memset() */
#include <inttypes.h> /* uint64_t */

namespace dsn {

class native_linux_aio_provider : public aio_provider
Expand All @@ -49,34 +39,14 @@ class native_linux_aio_provider : public aio_provider
dsn_handle_t open(const char *file_name, int flag, int pmode) override;
error_code close(dsn_handle_t fh) override;
error_code flush(dsn_handle_t fh) override;
void submit_aio_task(aio_task *aio) override;
aio_context *prepare_aio_context(aio_task *tsk) override;

class linux_disk_aio_context : public aio_context
{
public:
struct iocb cb;
aio_task *tsk;
native_linux_aio_provider *this_;
utils::notify_event *evt;
error_code err;
uint32_t bytes;
error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override;
error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override;

explicit linux_disk_aio_context(aio_task *tsk_)
: tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0)
{
}
};
void submit_aio_task(aio_task *aio) override;
aio_context *prepare_aio_context(aio_task *tsk) override { return new aio_context; }

protected:
error_code aio_internal(aio_task *aio, bool async, /*out*/ uint32_t *pbytes = nullptr);
void complete_aio(struct iocb *io, int bytes, int err);
void get_event();

private:
io_context_t _ctx;
std::atomic<bool> _is_running{false};
std::thread _worker;
};

} // namespace dsn
3 changes: 2 additions & 1 deletion src/nfs/nfs_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err,
_copy_requests_low.push(std::move(copy_requests));
}

continue_copy();
tasking::enqueue(LPC_NFS_COPY_FILE, nullptr, [this]() { continue_copy(); }, 0);
}

void nfs_client_impl::continue_copy()
Expand Down Expand Up @@ -268,6 +268,7 @@ void nfs_client_impl::continue_copy()
zauto_lock l(req->lock);
const user_request_ptr &ureq = req->file_ctx->user_req;
if (req->is_valid) {
// todo(jiashuo1) use non-block api `consumeWithBorrowNonBlocking` or `consume`
_copy_token_bucket->consumeWithBorrowAndWait(req->size);

copy_request copy_req;
Expand Down
4 changes: 2 additions & 2 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ void replica::execute_mutation(mutation_ptr &mu)
name(),
mu->name(),
static_cast<int>(mu->client_requests.size()));
ADD_POINT(mu->tracer);

error_code err = ERR_OK;
decree d = mu->data.header.decree;
Expand All @@ -244,6 +243,7 @@ void replica::execute_mutation(mutation_ptr &mu)
}
break;
case partition_status::PS_PRIMARY: {
ADD_POINT(mu->tracer);
check_state_completeness();
dassert(_app->last_committed_decree() + 1 == d,
"app commit: %" PRId64 ", mutation decree: %" PRId64 "",
Expand Down Expand Up @@ -308,6 +308,7 @@ void replica::execute_mutation(mutation_ptr &mu)
}

if (status() == partition_status::PS_PRIMARY) {
ADD_CUSTOM_POINT(mu->tracer, "completed");
mutation_ptr next = _primary_states.write_queue.check_possible_work(
static_cast<int>(_prepare_list->max_decree() - d));

Expand All @@ -317,7 +318,6 @@ void replica::execute_mutation(mutation_ptr &mu)
}

// update table level latency perf-counters for primary partition
ADD_CUSTOM_POINT(mu->tracer, "completed");
if (partition_status::PS_PRIMARY == status()) {
uint64_t now_ns = dsn_now_ns();
for (auto update : mu->data.updates) {
Expand Down
4 changes: 3 additions & 1 deletion src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ ::dsn::error_code replication_app_base::apply_mutation(const mutation *mu)
(int)mu->client_requests.size());
dassert(mu->data.updates.size() > 0, "");

ADD_POINT(mu->tracer);
if (_replica->status() == partition_status::PS_PRIMARY) {
ADD_POINT(mu->tracer);
}

bool has_ingestion_request = false;
int request_count = static_cast<int>(mu->client_requests.size());
Expand Down
1 change: 1 addition & 0 deletions src/replica/test/log_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class log_file_test : public replica_test_base
public:
void SetUp() override
{
utils::filesystem::remove_path(_log_dir);
utils::filesystem::create_directory(_log_dir);
_logf = log_file::create_write(_log_dir.c_str(), 1, _start_offset);
}
Expand Down

0 comments on commit a975bd6

Please sign in to comment.