From 4307397bc35dc6a5453ba4941bb1837b48e94e27 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 12:48:56 +0100 Subject: [PATCH 01/13] fs: make sure to write entire buffer fs.write(v) is not guaranteed to write everything in a single call. Make sure we don't assume so. --- lib/internal/fs/streams.js | 51 +++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index c7ecbff76cefd6..ee04c5519b3792 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -386,9 +386,42 @@ WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; +function writeAll(data, pos, cb) { + const len = data.length; + this[kFs].write(this.fd, data, 0, len, pos, (er, bytesWritten) => { + if (this.destroyed || er) { + return cb(er); + } + + this.bytesWritten += bytesWritten; + + if (bytesWritten < len) { + writeAll(data.slice(bytesWritten), pos + bytesWritten, cb); + } else { + cb(); + } + }); +} + +function writevAll(chunks, pos, cb) { + this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten) => { + if (this.destroyed || er) { + return cb(er); + } + + this.bytesWritten += bytesWritten; + + if (bytesWritten < len) { + writevAll([Buffer.concat(chunks).slice(bytesWritten)], pos + bytesWritten, cb); + } else { + cb(); + } + }); +} + WriteStream.prototype._write = function(data, encoding, cb) { this[kIsPerformingIO] = true; - this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { + writeAll.call(this, data, this.pos, (er) => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -396,12 +429,7 @@ WriteStream.prototype._write = function(data, encoding, cb) { return this.emit(kIoDone, er); } - if (er) { - return cb(er); - } - - this.bytesWritten += bytes; - cb(); + cb(er); }); if (this.pos !== undefined) @@ -421,7 +449,7 @@ WriteStream.prototype._writev = function(data, cb) { } this[kIsPerformingIO] = true; - this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => { + writevAll.call(this, chunks, this.pos, (er) => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -429,12 +457,7 @@ WriteStream.prototype._writev = function(data, cb) { return this.emit(kIoDone, er); } - if (er) { - return cb(er); - } - - this.bytesWritten += bytes; - cb(); + cb(er); }); if (this.pos !== undefined) From b6b968012bd16e26769f0766ffaa8782bdd07053 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:18:51 +0100 Subject: [PATCH 02/13] fixup --- lib/internal/fs/streams.js | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index ee04c5519b3792..086a0d4897031a 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -386,9 +386,18 @@ WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; -function writeAll(data, pos, cb) { +function writeAll(data, pos, cb, retries = 0) { + if (retries > 5) { + process.nextTick(() => cb(new Error('write failed'))) + } + const len = data.length; this[kFs].write(this.fd, data, 0, len, pos, (er, bytesWritten) => { + if (er?.code === 'EAGAIN') { + er = null + bytesWritten = 0 + } + if (this.destroyed || er) { return cb(er); } @@ -396,15 +405,24 @@ function writeAll(data, pos, cb) { this.bytesWritten += bytesWritten; if (bytesWritten < len) { - writeAll(data.slice(bytesWritten), pos + bytesWritten, cb); + writeAll(data.slice(bytesWritten), pos + bytesWritten, cb, retries + 1); } else { cb(); } }); } -function writevAll(chunks, pos, cb) { +function writevAll(chunks, pos, cb, retries = 0) { + if (retries > 5) { + process.nextTick(() => cb(new Error('write failed'))) + } + this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten) => { + if (er?.code === 'EAGAIN') { + er = null + bytesWritten = 0 + } + if (this.destroyed || er) { return cb(er); } @@ -412,7 +430,7 @@ function writevAll(chunks, pos, cb) { this.bytesWritten += bytesWritten; if (bytesWritten < len) { - writevAll([Buffer.concat(chunks).slice(bytesWritten)], pos + bytesWritten, cb); + writevAll([Buffer.concat(chunks).slice(bytesWritten)], pos + bytesWritten, cb, retries + 1); } else { cb(); } From a5bf600b1b51a3df594851022034d965d475d264 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:23:56 +0100 Subject: [PATCH 03/13] fixup --- lib/internal/fs/streams.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 086a0d4897031a..fd9f2713067ddd 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -414,7 +414,7 @@ function writeAll(data, pos, cb, retries = 0) { function writevAll(chunks, pos, cb, retries = 0) { if (retries > 5) { - process.nextTick(() => cb(new Error('write failed'))) + process.nextTick(() => cb(new Error('writev failed'))) } this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten) => { From 523314a50267578271fb49dfa4687a010fc53c8e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:25:24 +0100 Subject: [PATCH 04/13] fixup --- lib/internal/fs/streams.js | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index fd9f2713067ddd..2dece0ca1e1eb9 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -387,10 +387,6 @@ WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; function writeAll(data, pos, cb, retries = 0) { - if (retries > 5) { - process.nextTick(() => cb(new Error('write failed'))) - } - const len = data.length; this[kFs].write(this.fd, data, 0, len, pos, (er, bytesWritten) => { if (er?.code === 'EAGAIN') { @@ -402,10 +398,16 @@ function writeAll(data, pos, cb, retries = 0) { return cb(er); } + retries = bytesWritten ? 0 : retries + 1 + + if (retries > 5) { + return cb(new Error('writev failed')); + } + this.bytesWritten += bytesWritten; if (bytesWritten < len) { - writeAll(data.slice(bytesWritten), pos + bytesWritten, cb, retries + 1); + writeAll(data.slice(bytesWritten), pos + bytesWritten, cb, retries); } else { cb(); } @@ -413,10 +415,6 @@ function writeAll(data, pos, cb, retries = 0) { } function writevAll(chunks, pos, cb, retries = 0) { - if (retries > 5) { - process.nextTick(() => cb(new Error('writev failed'))) - } - this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten) => { if (er?.code === 'EAGAIN') { er = null @@ -429,8 +427,14 @@ function writevAll(chunks, pos, cb, retries = 0) { this.bytesWritten += bytesWritten; + retries = bytesWritten ? 0 : retries + 1 + + if (retries > 5) { + return cb(new Error('writev failed')); + } + if (bytesWritten < len) { - writevAll([Buffer.concat(chunks).slice(bytesWritten)], pos + bytesWritten, cb, retries + 1); + writevAll([Buffer.concat(chunks).slice(bytesWritten)], pos + bytesWritten, cb, retries); } else { cb(); } From d4c7a902c7694352ccd1adad86e1b23691c3a394 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:26:33 +0100 Subject: [PATCH 05/13] fixup --- lib/internal/fs/streams.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 2dece0ca1e1eb9..c42bf46127bf1d 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -390,15 +390,15 @@ function writeAll(data, pos, cb, retries = 0) { const len = data.length; this[kFs].write(this.fd, data, 0, len, pos, (er, bytesWritten) => { if (er?.code === 'EAGAIN') { - er = null - bytesWritten = 0 + er = null; + bytesWritten = 0; } if (this.destroyed || er) { return cb(er); } - retries = bytesWritten ? 0 : retries + 1 + retries = bytesWritten ? 0 : retries + 1; if (retries > 5) { return cb(new Error('writev failed')); @@ -417,8 +417,8 @@ function writeAll(data, pos, cb, retries = 0) { function writevAll(chunks, pos, cb, retries = 0) { this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten) => { if (er?.code === 'EAGAIN') { - er = null - bytesWritten = 0 + er = null; + bytesWritten = 0; } if (this.destroyed || er) { @@ -427,7 +427,7 @@ function writevAll(chunks, pos, cb, retries = 0) { this.bytesWritten += bytesWritten; - retries = bytesWritten ? 0 : retries + 1 + retries = bytesWritten ? 0 : retries + 1; if (retries > 5) { return cb(new Error('writev failed')); From d0afc94cf53d0bbf1195678011b4959a51ed8dc5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:35:54 +0100 Subject: [PATCH 06/13] fixuP --- lib/internal/fs/promises.js | 1 + lib/internal/fs/streams.js | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 868c7df2f1ed79..58b694cbe2ec45 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -578,6 +578,7 @@ async function write(handle, buffer, offset, length, position) { const bytesWritten = (await binding.writeBuffer(handle.fd, buffer, offset, length, position, kUsePromises)) || 0; + return { bytesWritten, buffer }; } diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index c42bf46127bf1d..38b0aa061f584f 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -415,6 +415,10 @@ function writeAll(data, pos, cb, retries = 0) { } function writevAll(chunks, pos, cb, retries = 0) { + let len = 0; + for (const chunk of chunks) { + len += Buffer.byteLength(chunk); + } this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten) => { if (er?.code === 'EAGAIN') { er = null; From 43f252907fb23663cdc3663e82191a8c476d6459 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:37:48 +0100 Subject: [PATCH 07/13] fixuP --- lib/internal/fs/streams.js | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 38b0aa061f584f..8152e5bab8977e 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -387,8 +387,7 @@ WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; function writeAll(data, pos, cb, retries = 0) { - const len = data.length; - this[kFs].write(this.fd, data, 0, len, pos, (er, bytesWritten) => { + this[kFs].write(this.fd, data, 0, data.length, pos, (er, bytesWritten, buffer) => { if (er?.code === 'EAGAIN') { er = null; bytesWritten = 0; @@ -398,13 +397,15 @@ function writeAll(data, pos, cb, retries = 0) { return cb(er); } + this.bytesWritten += bytesWritten; + retries = bytesWritten ? 0 : retries + 1; if (retries > 5) { return cb(new Error('writev failed')); } - this.bytesWritten += bytesWritten; + const len = Byffer.byteLength(data); if (bytesWritten < len) { writeAll(data.slice(bytesWritten), pos + bytesWritten, cb, retries); @@ -415,11 +416,7 @@ function writeAll(data, pos, cb, retries = 0) { } function writevAll(chunks, pos, cb, retries = 0) { - let len = 0; - for (const chunk of chunks) { - len += Buffer.byteLength(chunk); - } - this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten) => { + this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => { if (er?.code === 'EAGAIN') { er = null; bytesWritten = 0; @@ -437,8 +434,13 @@ function writevAll(chunks, pos, cb, retries = 0) { return cb(new Error('writev failed')); } + let len = 0; + for (const buf of buffers) { + len += Buffer.byteLength(buf); + } + if (bytesWritten < len) { - writevAll([Buffer.concat(chunks).slice(bytesWritten)], pos + bytesWritten, cb, retries); + writevAll([Buffer.concat(buffers).slice(bytesWritten)], pos + bytesWritten, cb, retries); } else { cb(); } From 4ff89b4c66c28c20ab4b0c1ea03d18551318bde3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:38:58 +0100 Subject: [PATCH 08/13] fixuP --- lib/internal/fs/streams.js | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 8152e5bab8977e..3d8b8f35aa021a 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -386,8 +386,8 @@ WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; -function writeAll(data, pos, cb, retries = 0) { - this[kFs].write(this.fd, data, 0, data.length, pos, (er, bytesWritten, buffer) => { +function writeAll(data, size, pos, cb, retries = 0) { + this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten) => { if (er?.code === 'EAGAIN') { er = null; bytesWritten = 0; @@ -405,9 +405,7 @@ function writeAll(data, pos, cb, retries = 0) { return cb(new Error('writev failed')); } - const len = Byffer.byteLength(data); - - if (bytesWritten < len) { + if (bytesWritten < size) { writeAll(data.slice(bytesWritten), pos + bytesWritten, cb, retries); } else { cb(); @@ -415,7 +413,7 @@ function writeAll(data, pos, cb, retries = 0) { }); } -function writevAll(chunks, pos, cb, retries = 0) { +function writevAll(chunks, size, pos, cb, retries = 0) { this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => { if (er?.code === 'EAGAIN') { er = null; @@ -434,12 +432,7 @@ function writevAll(chunks, pos, cb, retries = 0) { return cb(new Error('writev failed')); } - let len = 0; - for (const buf of buffers) { - len += Buffer.byteLength(buf); - } - - if (bytesWritten < len) { + if (bytesWritten < size) { writevAll([Buffer.concat(buffers).slice(bytesWritten)], pos + bytesWritten, cb, retries); } else { cb(); @@ -449,7 +442,7 @@ function writevAll(chunks, pos, cb, retries = 0) { WriteStream.prototype._write = function(data, encoding, cb) { this[kIsPerformingIO] = true; - writeAll.call(this, data, this.pos, (er) => { + writeAll.call(this, data.length, data, this.pos, (er) => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -477,7 +470,7 @@ WriteStream.prototype._writev = function(data, cb) { } this[kIsPerformingIO] = true; - writevAll.call(this, chunks, this.pos, (er) => { + writevAll.call(this, size, chunks, this.pos, (er) => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. From 38695611f266ff5566e78f7027fdee67f42450b5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:39:06 +0100 Subject: [PATCH 09/13] fixuP --- lib/internal/fs/streams.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 3d8b8f35aa021a..2314aa79b4d8d9 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -387,7 +387,7 @@ WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; function writeAll(data, size, pos, cb, retries = 0) { - this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten) => { + this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => { if (er?.code === 'EAGAIN') { er = null; bytesWritten = 0; @@ -406,7 +406,7 @@ function writeAll(data, size, pos, cb, retries = 0) { } if (bytesWritten < size) { - writeAll(data.slice(bytesWritten), pos + bytesWritten, cb, retries); + writeAll(buffer.slice(bytesWritten), pos + bytesWritten, cb, retries); } else { cb(); } From ab9570862c0ed87a515aba1b4b16839f384d5064 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:42:48 +0100 Subject: [PATCH 10/13] fixuP --- lib/internal/fs/streams.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 2314aa79b4d8d9..5868fe1782125c 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -406,7 +406,7 @@ function writeAll(data, size, pos, cb, retries = 0) { } if (bytesWritten < size) { - writeAll(buffer.slice(bytesWritten), pos + bytesWritten, cb, retries); + writeAll(buffer.slice(bytesWritten), size - bytesWritten, pos + bytesWritten, cb, retries); } else { cb(); } @@ -433,7 +433,7 @@ function writevAll(chunks, size, pos, cb, retries = 0) { } if (bytesWritten < size) { - writevAll([Buffer.concat(buffers).slice(bytesWritten)], pos + bytesWritten, cb, retries); + writevAll([Buffer.concat(buffers).slice(bytesWritten)], size - bytesWritten, pos + bytesWritten, cb, retries); } else { cb(); } From 43d902021468a27409648e2331dac99395faf4d6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:43:38 +0100 Subject: [PATCH 11/13] fixuP --- lib/internal/fs/streams.js | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 5868fe1782125c..99346078335d9c 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -405,8 +405,11 @@ function writeAll(data, size, pos, cb, retries = 0) { return cb(new Error('writev failed')); } - if (bytesWritten < size) { - writeAll(buffer.slice(bytesWritten), size - bytesWritten, pos + bytesWritten, cb, retries); + size -= bytesWritten + pos += bytesWritten + + if (size) { + writeAll(buffer.slice(bytesWritten), size, pos, cb, retries); } else { cb(); } @@ -432,8 +435,11 @@ function writevAll(chunks, size, pos, cb, retries = 0) { return cb(new Error('writev failed')); } - if (bytesWritten < size) { - writevAll([Buffer.concat(buffers).slice(bytesWritten)], size - bytesWritten, pos + bytesWritten, cb, retries); + size -= bytesWritten + pos += bytesWritten + + if (size) { + writevAll([Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries); } else { cb(); } From 09b79941a3c605b9aa903ba09abcc7cee1eda1cc Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 22 Mar 2022 14:44:31 +0100 Subject: [PATCH 12/13] fixuP --- lib/internal/fs/streams.js | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 99346078335d9c..24b559c7c81f2a 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -400,15 +400,12 @@ function writeAll(data, size, pos, cb, retries = 0) { this.bytesWritten += bytesWritten; retries = bytesWritten ? 0 : retries + 1; + size -= bytesWritten; + pos += bytesWritten; if (retries > 5) { - return cb(new Error('writev failed')); - } - - size -= bytesWritten - pos += bytesWritten - - if (size) { + cb(new Error('writev failed')); + } else if (size) { writeAll(buffer.slice(bytesWritten), size, pos, cb, retries); } else { cb(); @@ -430,15 +427,12 @@ function writevAll(chunks, size, pos, cb, retries = 0) { this.bytesWritten += bytesWritten; retries = bytesWritten ? 0 : retries + 1; + size -= bytesWritten; + pos += bytesWritten; if (retries > 5) { - return cb(new Error('writev failed')); - } - - size -= bytesWritten - pos += bytesWritten - - if (size) { + cb(new Error('writev failed')); + } else if (size) { writevAll([Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries); } else { cb(); From be950ef27d9afb34ad7086299e3fd2f9cd96e7d1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 23 Mar 2022 06:09:27 +0100 Subject: [PATCH 13/13] Update lib/internal/fs/promises.js Co-authored-by: Antoine du Hamel --- lib/internal/fs/promises.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 58b694cbe2ec45..868c7df2f1ed79 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -578,7 +578,6 @@ async function write(handle, buffer, offset, length, position) { const bytesWritten = (await binding.writeBuffer(handle.fd, buffer, offset, length, position, kUsePromises)) || 0; - return { bytesWritten, buffer }; }