Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix: crash when huge write #1099

Merged
merged 4 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/dsn/tool-api/aio_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class aio_context : public ref_counter
// filled by apps
dsn_handle_t file;
void *buffer;
uint32_t buffer_size;
uint64_t buffer_size;
uint64_t file_offset;

// filled by frameworks
Expand Down
2 changes: 1 addition & 1 deletion src/aio/aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace dsn {

aio_provider::aio_provider(disk_engine *disk) : _engine(disk) {}

void aio_provider::complete_io(aio_task *aio, error_code err, uint32_t bytes)
void aio_provider::complete_io(aio_task *aio, error_code err, uint64_t bytes)
{
_engine->complete_io(aio, err, bytes);
}
Expand Down
6 changes: 3 additions & 3 deletions src/aio/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class aio_provider

virtual error_code close(dsn_handle_t fh) = 0;
virtual error_code flush(dsn_handle_t fh) = 0;
virtual error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0;
virtual error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0;
virtual error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0;
virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0;

// Submits the aio_task to the underlying disk-io executor.
// This task may not be executed immediately, call `aio_task::wait`
Expand All @@ -69,7 +69,7 @@ class aio_provider

virtual aio_context *prepare_aio_context(aio_task *) = 0;

void complete_io(aio_task *aio, error_code err, uint32_t bytes);
void complete_io(aio_task *aio, error_code err, uint64_t bytes);

private:
disk_engine *_engine;
Expand Down
18 changes: 7 additions & 11 deletions src/aio/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static disk_engine_initializer disk_engine_init;
aio_task *disk_write_queue::unlink_next_workload(void *plength)
{
uint64_t next_offset = 0;
uint32_t &sz = *(uint32_t *)plength;
uint64_t &sz = *(uint64_t *)plength;
sz = 0;

aio_task *first = _hdr._first, *current = first, *last = first;
Expand Down Expand Up @@ -125,11 +125,7 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err,

if (err == ERR_OK) {
size_t this_size = (size_t)wk->get_aio_context()->buffer_size;
dassert(size >= this_size,
"written buffer size does not equal to input buffer's size: %d vs %d",
(int)size,
(int)this_size);

dcheck_ge(size, this_size);
wk->enqueue(err, this_size);
size -= this_size;
} else {
Expand Down Expand Up @@ -167,7 +163,7 @@ class batch_write_io_task : public aio_task
virtual void exec() override
{
auto df = (disk_file *)_tasks->get_aio_context()->file_object;
uint32_t sz;
uint64_t sz;

auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), get_transferred_size());
if (wk) {
Expand All @@ -193,14 +189,14 @@ void disk_engine::write(aio_task *aio)
dio->engine = this;
dio->type = AIO_Write;

uint32_t sz;
uint64_t sz;
auto wk = df->write(aio, &sz);
if (wk) {
process_write(wk, sz);
}
}

void disk_engine::process_write(aio_task *aio, uint32_t sz)
void disk_engine::process_write(aio_task *aio, uint64_t sz)
{
aio_context *dio = aio->get_aio_context();

Expand Down Expand Up @@ -243,7 +239,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz)
}
}

void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes)
void disk_engine::complete_io(aio_task *aio, error_code err, uint64_t bytes)
{
if (err != ERR_OK) {
dinfo("disk operation failure with code %s, err = %s, aio_task_id = %016" PRIx64,
Expand All @@ -270,7 +266,7 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes)

// write
else {
uint32_t sz;
uint64_t sz;
auto wk = df->on_write_completed(aio, (void *)&sz, err, (size_t)bytes);
if (wk) {
process_write(wk, sz);
Expand Down
4 changes: 2 additions & 2 deletions src/aio/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class disk_engine : public utils::singleton<disk_engine>
disk_engine();
~disk_engine() = default;

void process_write(aio_task *wk, uint32_t sz);
void complete_io(aio_task *aio, error_code err, uint32_t bytes);
void process_write(aio_task *wk, uint64_t sz);
void complete_io(aio_task *aio, error_code err, uint64_t bytes);

std::unique_ptr<aio_provider> _provider;

Expand Down
18 changes: 9 additions & 9 deletions src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh)
}

error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
/*out*/ uint32_t *processed_bytes)
/*out*/ uint64_t *processed_bytes)
{
dsn::error_code resp = ERR_OK;
uint32_t buffer_offset = 0;
uint64_t buffer_offset = 0;
do {
// ret is the written data size
uint32_t ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
(char *)aio_ctx.buffer + buffer_offset,
aio_ctx.buffer_size - buffer_offset,
aio_ctx.file_offset + buffer_offset);
auto ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
(char *)aio_ctx.buffer + buffer_offset,
aio_ctx.buffer_size - buffer_offset,
aio_ctx.file_offset + buffer_offset);
if (dsn_unlikely(ret < 0)) {
if (errno == EINTR) {
dwarn_f("write failed with errno={} and will retry it.", strerror(errno));
Expand Down Expand Up @@ -114,7 +114,7 @@ error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
}

error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
/*out*/ uint32_t *processed_bytes)
/*out*/ uint64_t *processed_bytes)
{
ssize_t ret = pread(static_cast<int>((ssize_t)aio_ctx.file),
aio_ctx.buffer,
Expand All @@ -126,7 +126,7 @@ error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
if (ret == 0) {
return ERR_HANDLE_EOF;
}
*processed_bytes = static_cast<uint32_t>(ret);
*processed_bytes = static_cast<uint64_t>(ret);
return ERR_OK;
}

Expand All @@ -148,7 +148,7 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk)
ADD_POINT(aio_tsk->_tracer);
aio_context *aio_ctx = aio_tsk->get_aio_context();
error_code err = ERR_UNKNOWN;
uint32_t processed_bytes = 0;
uint64_t processed_bytes = 0;
switch (aio_ctx->type) {
case AIO_Read:
err = read(*aio_ctx, &processed_bytes);
Expand Down
4 changes: 2 additions & 2 deletions src/aio/native_linux_aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class native_linux_aio_provider : public aio_provider
dsn_handle_t open(const char *file_name, int flag, int pmode) override;
error_code close(dsn_handle_t fh) override;
error_code flush(dsn_handle_t fh) override;
error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override;
error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override;
error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override;
error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override;

void submit_aio_task(aio_task *aio) override;
aio_context *prepare_aio_context(aio_task *tsk) override { return new aio_context; }
Expand Down