diff --git a/include/dsn/tool-api/aio_task.h b/include/dsn/tool-api/aio_task.h index 4e65ed4098..354e0d9caf 100644 --- a/include/dsn/tool-api/aio_task.h +++ b/include/dsn/tool-api/aio_task.h @@ -55,7 +55,7 @@ class aio_context : public ref_counter // filled by apps dsn_handle_t file; void *buffer; - uint32_t buffer_size; + uint64_t buffer_size; uint64_t file_offset; // filled by frameworks diff --git a/src/aio/aio_provider.cpp b/src/aio/aio_provider.cpp index b6ff9a28e4..cecc524d7a 100644 --- a/src/aio/aio_provider.cpp +++ b/src/aio/aio_provider.cpp @@ -31,7 +31,7 @@ namespace dsn { aio_provider::aio_provider(disk_engine *disk) : _engine(disk) {} -void aio_provider::complete_io(aio_task *aio, error_code err, uint32_t bytes) +void aio_provider::complete_io(aio_task *aio, error_code err, uint64_t bytes) { _engine->complete_io(aio, err, bytes); } diff --git a/src/aio/aio_provider.h b/src/aio/aio_provider.h index 1bdae55f32..d39f3a8596 100644 --- a/src/aio/aio_provider.h +++ b/src/aio/aio_provider.h @@ -59,8 +59,8 @@ class aio_provider virtual error_code close(dsn_handle_t fh) = 0; virtual error_code flush(dsn_handle_t fh) = 0; - virtual error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0; - virtual error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0; + virtual error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0; + virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) = 0; // Submits the aio_task to the underlying disk-io executor. // This task may not be executed immediately, call `aio_task::wait` @@ -69,7 +69,7 @@ class aio_provider virtual aio_context *prepare_aio_context(aio_task *) = 0; - void complete_io(aio_task *aio, error_code err, uint32_t bytes); + void complete_io(aio_task *aio, error_code err, uint64_t bytes); private: disk_engine *_engine; diff --git a/src/aio/disk_engine.cpp b/src/aio/disk_engine.cpp index 6b233d5e41..a83be95e28 100644 --- a/src/aio/disk_engine.cpp +++ b/src/aio/disk_engine.cpp @@ -53,7 +53,7 @@ static disk_engine_initializer disk_engine_init; aio_task *disk_write_queue::unlink_next_workload(void *plength) { uint64_t next_offset = 0; - uint32_t &sz = *(uint32_t *)plength; + uint64_t &sz = *(uint64_t *)plength; sz = 0; aio_task *first = _hdr._first, *current = first, *last = first; @@ -125,11 +125,7 @@ aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code err, if (err == ERR_OK) { size_t this_size = (size_t)wk->get_aio_context()->buffer_size; - dassert(size >= this_size, - "written buffer size does not equal to input buffer's size: %d vs %d", - (int)size, - (int)this_size); - + dcheck_ge(size, this_size); wk->enqueue(err, this_size); size -= this_size; } else { @@ -167,7 +163,7 @@ class batch_write_io_task : public aio_task virtual void exec() override { auto df = (disk_file *)_tasks->get_aio_context()->file_object; - uint32_t sz; + uint64_t sz; auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), get_transferred_size()); if (wk) { @@ -193,14 +189,14 @@ void disk_engine::write(aio_task *aio) dio->engine = this; dio->type = AIO_Write; - uint32_t sz; + uint64_t sz; auto wk = df->write(aio, &sz); if (wk) { process_write(wk, sz); } } -void disk_engine::process_write(aio_task *aio, uint32_t sz) +void disk_engine::process_write(aio_task *aio, uint64_t sz) { aio_context *dio = aio->get_aio_context(); @@ -243,7 +239,7 @@ void disk_engine::process_write(aio_task *aio, uint32_t sz) } } -void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes) +void disk_engine::complete_io(aio_task *aio, error_code err, uint64_t bytes) { if (err != ERR_OK) { dinfo("disk operation failure with code %s, err = %s, aio_task_id = %016" PRIx64, @@ -270,7 +266,7 @@ void disk_engine::complete_io(aio_task *aio, error_code err, uint32_t bytes) // write else { - uint32_t sz; + uint64_t sz; auto wk = df->on_write_completed(aio, (void *)&sz, err, (size_t)bytes); if (wk) { process_write(wk, sz); diff --git a/src/aio/disk_engine.h b/src/aio/disk_engine.h index 1d31650066..e35c243209 100644 --- a/src/aio/disk_engine.h +++ b/src/aio/disk_engine.h @@ -79,8 +79,8 @@ class disk_engine : public utils::singleton disk_engine(); ~disk_engine() = default; - void process_write(aio_task *wk, uint32_t sz); - void complete_io(aio_task *aio, error_code err, uint32_t bytes); + void process_write(aio_task *wk, uint64_t sz); + void complete_io(aio_task *aio, error_code err, uint64_t bytes); std::unique_ptr _provider; diff --git a/src/aio/native_linux_aio_provider.cpp b/src/aio/native_linux_aio_provider.cpp index 6f7be7f2d7..ee5c432a91 100644 --- a/src/aio/native_linux_aio_provider.cpp +++ b/src/aio/native_linux_aio_provider.cpp @@ -72,16 +72,16 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh) } error_code native_linux_aio_provider::write(const aio_context &aio_ctx, - /*out*/ uint32_t *processed_bytes) + /*out*/ uint64_t *processed_bytes) { dsn::error_code resp = ERR_OK; - uint32_t buffer_offset = 0; + uint64_t buffer_offset = 0; do { // ret is the written data size - uint32_t ret = pwrite(static_cast((ssize_t)aio_ctx.file), - (char *)aio_ctx.buffer + buffer_offset, - aio_ctx.buffer_size - buffer_offset, - aio_ctx.file_offset + buffer_offset); + auto ret = pwrite(static_cast((ssize_t)aio_ctx.file), + (char *)aio_ctx.buffer + buffer_offset, + aio_ctx.buffer_size - buffer_offset, + aio_ctx.file_offset + buffer_offset); if (dsn_unlikely(ret < 0)) { if (errno == EINTR) { dwarn_f("write failed with errno={} and will retry it.", strerror(errno)); @@ -114,7 +114,7 @@ error_code native_linux_aio_provider::write(const aio_context &aio_ctx, } error_code native_linux_aio_provider::read(const aio_context &aio_ctx, - /*out*/ uint32_t *processed_bytes) + /*out*/ uint64_t *processed_bytes) { ssize_t ret = pread(static_cast((ssize_t)aio_ctx.file), aio_ctx.buffer, @@ -126,7 +126,7 @@ error_code native_linux_aio_provider::read(const aio_context &aio_ctx, if (ret == 0) { return ERR_HANDLE_EOF; } - *processed_bytes = static_cast(ret); + *processed_bytes = static_cast(ret); return ERR_OK; } @@ -148,7 +148,7 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk) ADD_POINT(aio_tsk->_tracer); aio_context *aio_ctx = aio_tsk->get_aio_context(); error_code err = ERR_UNKNOWN; - uint32_t processed_bytes = 0; + uint64_t processed_bytes = 0; switch (aio_ctx->type) { case AIO_Read: err = read(*aio_ctx, &processed_bytes); diff --git a/src/aio/native_linux_aio_provider.h b/src/aio/native_linux_aio_provider.h index d2a8ce4a9b..48c02f26cf 100644 --- a/src/aio/native_linux_aio_provider.h +++ b/src/aio/native_linux_aio_provider.h @@ -39,8 +39,8 @@ class native_linux_aio_provider : public aio_provider dsn_handle_t open(const char *file_name, int flag, int pmode) override; error_code close(dsn_handle_t fh) override; error_code flush(dsn_handle_t fh) override; - error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override; - error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override; + error_code write(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override; + error_code read(const aio_context &aio_ctx, /*out*/ uint64_t *processed_bytes) override; void submit_aio_task(aio_task *aio) override; aio_context *prepare_aio_context(aio_task *tsk) override { return new aio_context; }