Skip to content

Commit

Permalink
fs: make sure to write entire buffer
Browse files Browse the repository at this point in the history
fs.write(v) is not guaranteed to write everything in a single
call. Make sure we don't assume so.

PR-URL: #49211
Co-authored-by: Chemi Atlow <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
  • Loading branch information
ronag and atlowChemi committed Aug 24, 2023
1 parent 996f390 commit feb5b0f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 15 deletions.
80 changes: 65 additions & 15 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ const {

const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED,
ERR_SYSTEM_ERROR,
} = require('internal/errors').codes;
const {
deprecate,
Expand Down Expand Up @@ -392,22 +394,75 @@ WriteStream.prototype.open = openWriteFs;

WriteStream.prototype._construct = _construct;

function writeAll(data, size, pos, cb, retries = 0) {
this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => {
// No data currently available and operation should be retried later.
if (er?.code === 'EAGAIN') {
er = null;
bytesWritten = 0;
}

if (this.destroyed || er) {
return cb(er || new ERR_STREAM_DESTROYED('write'));
}

this.bytesWritten += bytesWritten;

retries = bytesWritten ? 0 : retries + 1;
size -= bytesWritten;
pos += bytesWritten;

// Try writing non-zero number of bytes up to 5 times.
if (retries > 5) {
cb(new ERR_SYSTEM_ERROR('write failed'));
} else if (size) {
writeAll.call(this, buffer.slice(bytesWritten), size, pos, cb, retries);
} else {
cb();
}
});
}

function writevAll(chunks, size, pos, cb, retries = 0) {
this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => {
// No data currently available and operation should be retried later.
if (er?.code === 'EAGAIN') {
er = null;
bytesWritten = 0;
}

if (this.destroyed || er) {
return cb(er || new ERR_STREAM_DESTROYED('writev'));
}

this.bytesWritten += bytesWritten;

retries = bytesWritten ? 0 : retries + 1;
size -= bytesWritten;
pos += bytesWritten;

// Try writing non-zero number of bytes up to 5 times.
if (retries > 5) {
cb(new ERR_SYSTEM_ERROR('writev failed'));
} else if (size) {
writevAll.call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries);
} else {
cb();
}
});
}

WriteStream.prototype._write = function(data, encoding, cb) {
this[kIsPerformingIO] = true;
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
writeAll.call(this, data, data.length, this.pos, (er) => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
return cb(er);
}

this.bytesWritten += bytes;
cb();
cb(er);
});

if (this.pos !== undefined)
Expand All @@ -427,20 +482,15 @@ WriteStream.prototype._writev = function(data, cb) {
}

this[kIsPerformingIO] = true;
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
writevAll.call(this, chunks, size, this.pos, (er) => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
return cb(er);
}

this.bytesWritten += bytes;
cb();
cb(er);
});

if (this.pos !== undefined)
Expand Down
39 changes: 39 additions & 0 deletions test/parallel/test-fs-write-stream-eagain.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import * as common from '../common/index.mjs';
import tmpdir from '../common/tmpdir.js';
import assert from 'node:assert';
import fs from 'node:fs';
import { describe, it, mock } from 'node:test';
import { finished } from 'node:stream/promises';

tmpdir.refresh();
const file = tmpdir.resolve('writeStreamEAGAIN.txt');
const errorWithEAGAIN = (fd, buffer, offset, length, position, callback) => {
callback(Object.assign(new Error(), { code: 'EAGAIN' }), 0, buffer);
};

describe('WriteStream EAGAIN', { concurrency: true }, () => {
it('_write', async () => {
const mockWrite = mock.fn(fs.write);
mockWrite.mock.mockImplementationOnce(errorWithEAGAIN);
const stream = fs.createWriteStream(file, {
fs: {
open: common.mustCall(fs.open),
write: mockWrite,
close: common.mustCall(fs.close),
}
});
stream.end('foo');
stream.on('close', common.mustCall());
stream.on('error', common.mustNotCall());
await finished(stream);
assert.strictEqual(mockWrite.mock.callCount(), 2);
assert.strictEqual(fs.readFileSync(file, 'utf8'), 'foo');
});

it('_write', async () => {
const stream = fs.createWriteStream(file);
mock.getter(stream, 'destroyed', () => true);
stream.end('foo');
await finished(stream).catch(common.mustCall());
});
});

0 comments on commit feb5b0f

Please sign in to comment.