diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index c7ecbff76cefd6..24b559c7c81f2a 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -386,9 +386,63 @@ WriteStream.prototype.open = openWriteFs; WriteStream.prototype._construct = _construct; +function writeAll(data, size, pos, cb, retries = 0) { + this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => { + if (er?.code === 'EAGAIN') { + er = null; + bytesWritten = 0; + } + + if (this.destroyed || er) { + return cb(er); + } + + this.bytesWritten += bytesWritten; + + retries = bytesWritten ? 0 : retries + 1; + size -= bytesWritten; + pos += bytesWritten; + + if (retries > 5) { + cb(new Error('writev failed')); + } else if (size) { + writeAll(buffer.slice(bytesWritten), size, pos, cb, retries); + } else { + cb(); + } + }); +} + +function writevAll(chunks, size, pos, cb, retries = 0) { + this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => { + if (er?.code === 'EAGAIN') { + er = null; + bytesWritten = 0; + } + + if (this.destroyed || er) { + return cb(er); + } + + this.bytesWritten += bytesWritten; + + retries = bytesWritten ? 0 : retries + 1; + size -= bytesWritten; + pos += bytesWritten; + + if (retries > 5) { + cb(new Error('writev failed')); + } else if (size) { + writevAll([Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries); + } else { + cb(); + } + }); +} + WriteStream.prototype._write = function(data, encoding, cb) { this[kIsPerformingIO] = true; - this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { + writeAll.call(this, data.length, data, this.pos, (er) => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -396,12 +450,7 @@ WriteStream.prototype._write = function(data, encoding, cb) { return this.emit(kIoDone, er); } - if (er) { - return cb(er); - } - - this.bytesWritten += bytes; - cb(); + cb(er); }); if (this.pos !== undefined) @@ -421,7 +470,7 @@ WriteStream.prototype._writev = function(data, cb) { } this[kIsPerformingIO] = true; - this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => { + writevAll.call(this, size, chunks, this.pos, (er) => { this[kIsPerformingIO] = false; if (this.destroyed) { // Tell ._destroy() that it's safe to close the fd now. @@ -429,12 +478,7 @@ WriteStream.prototype._writev = function(data, cb) { return this.emit(kIoDone, er); } - if (er) { - return cb(er); - } - - this.bytesWritten += bytes; - cb(); + cb(er); }); if (this.pos !== undefined)