Skip to content

Commit

Permalink
Add locking and precise flow control to WritableStream
Browse files Browse the repository at this point in the history
This fixes #319 (by adding locking and the concept of a writable stream writer) and fixes #318 (by adding precise flow control via the writer.desiredSize property). The structure parallels that introduced for ReadableStream, including the introduction of a controller class.

The infrastructure is set up to allow the future introduction of different types of writable streams (with corresponding new controllers and writers).

With this in place, the public API for writable streams is almost stable; the remaining issue is whether writers should have a ready promise, or a waitForDesiredSize() method which could in the future be customized by passing an argument (see discussions in #318).

The reference implementation's pipeTo and transform stream code has been updated, and all the tests pass, but they haven't been put in the spec yet, as their design is not yet finalized (but is much closer to now that they have a more stable WritableStream foundation).
  • Loading branch information
tyoshino committed Aug 4, 2016
1 parent 8c89f6f commit 5a27534
Show file tree
Hide file tree
Showing 20 changed files with 2,755 additions and 1,295 deletions.
922 changes: 644 additions & 278 deletions index.bs

Large diffs are not rendered by default.

291 changes: 234 additions & 57 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,95 +86,272 @@ class ReadableStream {
}

pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) {
// brandcheck

preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);

const source = this;

let reader;
let lastRead;
let lastWrite;
let closedPurposefully = false;
let resolvePipeToPromise;
let rejectPipeToPromise;
let _resolvePipeToPromise;
let _rejectPipeToPromise;

let _reader;
let _writer;

let _state = 'piping';

let _lastRead;
let _lastWrite;
let _allWrites;

return new Promise((resolve, reject) => {
resolvePipeToPromise = resolve;
rejectPipeToPromise = reject;
_resolvePipeToPromise = resolve;
_rejectPipeToPromise = reject;

reader = source.getReader();
_reader = source.getReader();
_writer = dest.getWriter();

reader.closed.catch(abortDest);
dest.closed.then(
() => {
if (!closedPurposefully) {
cancelSource(new TypeError('destination is closing or closed and cannot be piped to anymore'));
}
},
cancelSource
_reader.closed.catch(handleReaderClosedRejection);
_writer.closed.then(
handleWriterClosedFulfillment,
handleWriterClosedRejection
);

doPipe();
});

function releaseReader() {
//console.log('pipeTo(): releaseReader()');

_reader.releaseLock();
_reader = undefined;
}

function releaseWriter() {
//console.log('pipeTo(): releaseWriter()');

_writer.releaseLock();
_writer = undefined;
}

function done() {
//console.log('pipeTo(): done()');

assert(_reader === undefined);
assert(_writer === undefined);

_state = 'done';

_lastRead = undefined;
_lastWrite = undefined;
_allWrites = undefined;
}

function finishWithFulfillment() {
//console.log('pipeTo(): finishWithFulfillment()');

_resolvePipeToPromise(undefined);
_resolvePipeToPromise = undefined;
_rejectPipeToPromise = undefined;

done();
}

function finishWithRejection(reason) {
//console.log('pipeTo(): finishWithRejection()');

_rejectPipeToPromise(reason);
_resolvePipeToPromise = undefined;
_rejectPipeToPromise = undefined;

done();
}

function abortWriterCancelReader(reason, skipAbort, skipCancel) {
const promises = [];

if (skipAbort === false) {
_writer.abort(reason);

releaseWriter();
} else if (_lastWrite === undefined) {
releaseWriter();
} else {
promises.push(_lastWrite.then(
() => {
releaseWriter();
},
() => {
releaseWriter();
}
));
}

if (skipCancel === false) {
_reader.cancel(reason);

releaseReader();
} else if (_lastRead === undefined) {
releaseReader();
} else {
promises.push(_lastRead.then(
() => {
releaseReader();
},
() => {
releaseReader();
}
));
}

if (promises.length > 0) {
Promise.all(promises).then(
() => {
finishWithRejection(reason);
}
);
_state = 'waitingForLastReadAndOrLastWrite';
return;
}

finishWithRejection(reason);
}

function handleWriteRejection(reason) {
//console.log('pipeTo(): handleWriteRejection()');

if (_state !== 'piping') {
return;
}

abortWriterCancelReader(reason, preventAbort, preventCancel);
}

function handleReadValue(value) {
//console.log('pipeTo(): handleReadValue()');

_lastWrite = _writer.write(value);
_lastWrite.catch(handleWriteRejection);

// dest may be already errored. But proceed to write().
_allWrites = Promise.all([_allWrites, _lastWrite]);

doPipe();
}

function handleReadDone() {
//console.log('pipeTo(): handleReadDone()');

// Does not need to wait for lastRead since it occurs only on source closed.

releaseReader();

if (preventClose === false) {
//console.log('pipeTo(): Close dest');

// We don't use writer.closed. We can ensure that the microtask for writer.closed is run before any
// writer.close() call so that we can determine whether the closure was caused by the close() or ws was already
// closed before pipeTo(). It's possible but fragile.
_writer.close().then(
() => {
return _allWrites;
},
reason => {
releaseWriter();
finishWithRejection(reason);
}
).then(
() => {
releaseWriter();
finishWithFulfillment();
}
);
_state = 'closingDest';

return;
}

if (_lastWrite === undefined) {
releaseWriter()
finishWithFulfillment();
return;
}

// We don't use writer.closed. pipeTo() is responsible only for what it has written.
_lastWrite.then(
() => {
releaseWriter();
finishWithFulfillment();
},
reason => {
releaseWriter();
finishWithRejection(reason)
}
);
_state = 'waitingLastWriteOnReadableClosed';
}

function doPipe() {
lastRead = reader.read();
Promise.all([lastRead, dest.ready]).then(([{ value, done }]) => {
if (Boolean(done) === true) {
closeDest();
} else if (dest.state === 'writable') {
lastWrite = dest.write(value);
doPipe();
//console.log('pipeTo(): doPipe()');

_lastRead = _reader.read();

Promise.all([_lastRead, _writer.ready]).then(
([{ value, done }]) => {
if (_state !== 'piping') {
return;
}

if (Boolean(done) === false) {
handleReadValue(value);
} else {
handleReadDone();
}
},
() => {
// Do nothing
}
})
)
.catch(rethrowAssertionErrorRejection);

// Any failures will be handled by listening to reader.closed and dest.closed above.
// TODO: handle malicious dest.write/dest.close?
}

function cancelSource(reason) {
if (preventCancel === false) {
reader.cancel(reason);
reader.releaseLock();
rejectPipeToPromise(reason);
} else {
// If we don't cancel, we need to wait for lastRead to finish before we're allowed to release.
// We don't need to handle lastRead failing because that will trigger abortDest which takes care of
// both of these.
lastRead.then(() => {
reader.releaseLock();
rejectPipeToPromise(reason);
});
function handleReaderClosedRejection(reason) {
//console.log('pipeTo(): handleReaderClosedRejection()');

if (_state !== 'piping') {
return;
}
}

function closeDest() {
// Does not need to wait for lastRead since it occurs only on source closed.
_lastRead = undefined;
abortWriterCancelReader(reason, preventAbort, true);
}

reader.releaseLock();
function handleUnexpectedWriterCloseAndError(reason) {
//console.log('pipeTo(): handleUnexpectedWriterCloseAndError()');

const destState = dest.state;
if (preventClose === false && (destState === 'waiting' || destState === 'writable')) {
closedPurposefully = true;
dest.close().then(resolvePipeToPromise, rejectPipeToPromise);
} else if (lastWrite !== undefined) {
lastWrite.then(resolvePipeToPromise, rejectPipeToPromise);
} else {
resolvePipeToPromise();
if (_state !== 'piping') {
return;
}

_lastWrite = undefined;
abortWriterCancelReader(reason, true, preventCancel);
}

function abortDest(reason) {
// Does not need to wait for lastRead since it only occurs on source errored.
function handleWriterClosedFulfillment() {
//console.log('pipeTo(): handleWriterClosedFulfillment()');

reader.releaseLock();
handleUnexpectedWriterCloseAndError(new TypeError('dest closed unexpectedly'));
}

if (preventAbort === false) {
dest.abort(reason);
}
rejectPipeToPromise(reason);
function handleWriterClosedRejection(reason) {
//console.log('pipeTo(): handleWriterClosedRejection()');

handleUnexpectedWriterCloseAndError(reason);
}
}

Expand Down
Loading

0 comments on commit 5a27534

Please sign in to comment.