Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify ReadableStream.[[Transfer]] #623

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 54 additions & 7 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ Instances of {{ReadableStream}} are created with the internal slots described in
<th>Description (<em>non-normative</em>)</th>
</tr>
</thead>
<tr>
<td>{{[[Detached]]}}
<td>A boolean flag set to <emu-val>true</emu-val> when the stream has been transferred away to another realm
</tr>
<tr>
<td>\[[disturbed]]
<td>A boolean flag set to <emu-val>true</emu-val> when the stream has been read from or canceled
Expand Down Expand Up @@ -448,6 +452,7 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat
<emu-alg>
1. Set *this*.[[state]] to `"readable"`.
1. Set *this*.[[reader]] and *this*.[[storedError]] to *undefined*.
1. Set *this*.<a idl>[[Detached]]</a> to *false*.
1. Set *this*.[[disturbed]] to *false*.
1. Set *this*.[[readableStreamController]] to *undefined*.
1. Let _type_ be ? GetV(_underlyingSource_, `"type"`).
Expand All @@ -456,10 +461,10 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat
1. If _highWaterMark_ is *undefined*, let _highWaterMark_ be *0*.
1. Set *this*.[[readableStreamController]] to ? Construct(`<a idl>ReadableByteStreamController</a>`, « *this*,
_underlyingSource_, _highWaterMark_ »).
1. Otherwise, if _type_ is *undefined*,
1. Otherwise, if _type_ is *undefined* or _type_ is `"cloning"`,
1. If _highWaterMark_ is *undefined*, let _highWaterMark_ be *1*.
1. Set *this*.[[readableStreamController]] to ? Construct(`<a idl>ReadableStreamDefaultController</a>`, « *this*,
_underlyingSource_, _size_, _highWaterMark_ »).
_underlyingSource_, _size_, _highWaterMark_, _type_ »).
1. Otherwise, throw a *RangeError* exception.
</emu-alg>

Expand Down Expand Up @@ -733,6 +738,30 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat
</code></pre>
</div>

<h4 id="rs-internal-methods">Readable Stream Internal Methods</h4>

The following internal method is implemented by each {{ReadableStream}} instance.

<h5 id="rs-internal-method-transfer"><a idl lt="[[Transfer]]()">\[[Transfer]](<var>targetRealm</var>)</a></h5>

<emu-alg>
1. If ! IsReadableStreamLocked(*this*) is *true*, throw a *TypeError* exception.
1. If *this*.[[state]] is `"errored"`, throw a *TypeError* exception.
1. Let _controller_ be *this*.[[readableStreamController]].
1. If _controller_.[[targetRealm]] is *undefined*, throw a *TypeError* exception.
1. Let _that_ be a new instance of <a idl>ReadableStream</a> in _targetRealm_.
1. Set _that_.[[state]] to *this*.[[state]].
1. Set _that_.[[disturbed]] to *this*.[[disturbed]].
1. Set _controller_.[[controlledReadableStream]] to _that_.
1. Set _that_.[[readableStreamController]] to _controller_.
1. Let _queue_ be _controller_.[[queue]].
1. Repeat for each Record {[[value]], [[size]]} _pair_ that is an element of _queue_,
1. Set _pair_.[[value]] to ! <a abstract-op>StructuredClone</a>(_pair_.[[value]], _targetRealm_).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use ?, not !, as StructuredClone could throw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually can't at this point, because every [[value]] here is already the result of a previous StructuredClone call. The only call to StructuredClone that could throw is at enqueue time.

1. Set _controller_.[[targetRealm]] to _targetRealm_.
1. Set _this_.<a idl>[[Detached]]</a> to *true*.
1. Return _that_.
</emu-alg>

<h3 id="rs-abstract-ops">General Readable Stream Abstract Operations</h3>

The following abstract operations, unlike most in this specification, are meant to be generally useful by other
Expand Down Expand Up @@ -781,10 +810,11 @@ readable stream has ever been read from or canceled.
<var>stream</var> )</h4>

This abstract operation is meant to be called from other specifications that may wish to query whether or not a
readable stream is <a>locked to a reader</a>.
readable stream is <a>locked to a reader</a> or has been transferred away to another realm.

<emu-alg>
1. Assert: ! IsReadableStream(_stream_) is *true*.
1. If _stream_.<a idl>[[Detached]]</a> is *true*, return *true*.
1. If _stream_.[[reader]] is *undefined*, return *false*.
1. Return *true*.
</emu-alg>
Expand Down Expand Up @@ -940,6 +970,7 @@ nothrow>ReadableStreamAddReadIntoRequest ( <var>stream</var> )</h4>
<var>reason</var> )</h4>

<emu-alg>
1. Assert: _stream_.<a idl>[[Detached]]</a> is *false*.
1. Set _stream_.[[disturbed]] to *true*.
1. If _stream_.[[state]] is `"closed"`, return <a>a promise resolved with</a> *undefined*.
1. If _stream_.[[state]] is `"errored"`, return <a>a promise rejected with</a> _stream_.[[storedError]].
Expand Down Expand Up @@ -1446,6 +1477,12 @@ Instances of {{ReadableStreamDefaultController}} are created with the internal s
<th>Description (<em>non-normative</em>)</th>
</tr>
</thead>
<tr>
<td>\[[targetRealm]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach, of using the ReadableStreamDefaultController in one realm instead of just creating a new stream + controller pair, is very interesting, but not what I expected. Why did you choose that?

To be specific, I would have expected the algorithm to be something like

const reader = this.getReader();

const that = new ReadableStream({
  pull(c) {
    return reader.read().then(
      ({ value, done }) => {
        if (done) {
          c.close();
          return;
        }

        c.enqueue(StructuredClone(value));
      },
      e => c.error(e)
    );
  },
  // probably more
});

Copy link
Contributor Author

@isonmad isonmad Dec 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach just seemed intuitive to me.

  • For one, I wanted to make throw-on-enqueue possible, so the clone call has to occur when a chunk first enters the controller's queue, not when it leaves it, as would happen in your algorithm.
  • The underlyingSource still lives in the original realm, and has a reference to the controller object. (Conceptually, 'controller' is sort of a term for two different things that are conflated, a public API object provided to the underlyingSource and the hidden implementation details of the stream. The API object is stuck in the original realm of the underlyingSource, but the 'implementation details' of the stream are in charge of moving things between realms, and I was conceiving that as synonymous with a single controller, I guess, not a controller+a second, entirely invisible controller with no corresponding public API object.)
  • I conceived of a transferred stream as a communications channel between two realms, but the destination realm could change when it's transferred repeatedly. You could also represent that as an ever-increasing chain of readablestreams that move messages between realms, and the implementations just do invisible optimizations as if those in between steps never happen, but this seemed a more accurate model of reality. Plus, what if those realms go away, or should go away, except for this one readablestream ferrying messages between two other realms keeping its event loop alive?
  • I thought it would also be nice to have a stream that clones its chunks without having to transfer it to a worker and back again, and there's no need for a second readablestream at all in that case.
  • It just seems like less code.

<td>Either *undefined*, or a Realm Record. If set to a Realm Record, ReadableStreamDefaultControllerEnqueue
will perform the structured clone algorithm on passed chunks, cloning them into the targetRealm
and enqueueing the result.
</tr>
<tr>
<td>\[[closeRequested]]
<td>A boolean flag indicating whether the stream has been closed by its <a>underlying source</a>, but still has
Expand Down Expand Up @@ -1494,7 +1531,7 @@ Instances of {{ReadableStreamDefaultController}} are created with the internal s
<h4 id="rs-default-controller-constructor" constructor for="ReadableStreamDefaultController"
lt="ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)">new
ReadableStreamDefaultController(<var>stream</var>, <var>underlyingSource</var>, <var>size</var>,
<var>highWaterMark</var>)</h4>
<var>highWaterMark</var>, <var>type</var>)</h4>

<div class="note">
The <code>ReadableStreamDefaultController</code> constructor cannot be used directly; it only works on a
Expand All @@ -1508,6 +1545,8 @@ ReadableStreamDefaultController(<var>stream</var>, <var>underlyingSource</var>,
1. Set *this*.[[underlyingSource]] to _underlyingSource_.
1. Set *this*.[[queue]] to a new empty List.
1. Set *this*.[[started]], *this*.[[closeRequested]], *this*.[[pullAgain]], and *this*.[[pulling]] to *false*.
1. Set *this*.[[targetRealm]] to *undefined*.
1. If _type_ is `"cloning"`, set *this*.[[targetRealm]] to the current Realm Record.
1. Let _normalizedStrategy_ be ? ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_).
1. Set *this*.[[strategySize]] to _normalizedStrategy_.[[size]] and *this*.[[strategyHWM]] to
_normalizedStrategy_.[[highWaterMark]].
Expand Down Expand Up @@ -1681,8 +1720,14 @@ asserts).
1. Let _stream_ be _controller_.[[controlledReadableStream]].
1. Assert: _controller_.[[closeRequested]] is *false*.
1. Assert: _stream_.[[state]] is `"readable"`.
1. If ! IsReadableStreamLocked(_stream_) is *true* and ! ReadableStreamGetNumReadRequests(_stream_) > *0*, perform
! ReadableStreamFulfillReadRequest(_stream_, _chunk_, *false*).
1. If ! IsReadableStreamLocked(_stream_) is *true* and ! ReadableStreamGetNumReadRequests(_stream_) > *0*,
1. If _controller_.[[targetRealm]] is not *undefined*,
1. Let _chunk_ be <a abstract-op>StructuredClone</a>(_chunk_, _controller_.[[targetRealm]]).
1. If _chunk_ is an abrupt completion,
1. Perform ! ReadableStreamDefaultControllerErrorIfNeeded(_controller_, _chunk_.[[Value]]).
1. Return _chunk_.
1. Let _chunk_ be _chunk_.[[Value]].
1. Perform ! ReadableStreamFulfillReadRequest(_stream_, _chunk_, *false*).
1. Otherwise,
1. Let _chunkSize_ be *1*.
1. If _controller_.[[strategySize]] is not *undefined*,
Expand Down Expand Up @@ -3589,11 +3634,13 @@ throughout the rest of this standard.
</emu-alg>

<h4 id="enqueue-value-with-size" aoid="EnqueueValueWithSize" throws>EnqueueValueWithSize ( <var>queue</var>,
<var>value</var>, <var>size</var> )</h4>
<var>value</var>, <var>size</var>, <var>targetRealm</var> )</h4>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems nicer to me if EnqueueValueWithSize stays as a naive implementation of the queue-with-sizes data structure, and the structured cloning happens elsewhere. Is doing that much uglier?

Copy link
Contributor Author

@isonmad isonmad Dec 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well for one, it would mean reimplementing the same code for WritableStream eventually, so I guess there could be a wrapper both RS and WS use to enqueue? But then there would be...a single user of EnqueueValueWithSize that didn't use the wrapper, WritableStreamDefaultControllerClose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right, I also had a preference for throwing the RangeError for invalid strategy return values before potentially throwing the DataCloneError.


<emu-alg>
1. If _targetRealm_ was not passed, let _targetRealm_ be *undefined*.
1. Let _size_ be ? ToNumber(_size_).
1. If ! IsFiniteNonNegativeNumber(_size_) is *false*, throw a *RangeError* exception.
1. If _targetRealm_ is not *undefined*, let _value_ be the result of ? StructuredClone(_value_, _targetRealm_).
1. Append Record {[[value]]: _value_, [[size]]: _size_} as the last element of _queue_.
</emu-alg>

Expand Down
8 changes: 7 additions & 1 deletion reference-implementation/lib/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';
const assert = require('assert');
/* structured clone is impossible to truly polyfill, but closest match */
const StructuredClone = require('realistic-structured-clone');
const { IsFiniteNonNegativeNumber } = require('./helpers.js');

exports.DequeueValue = queue => {
Expand All @@ -11,12 +13,16 @@ exports.DequeueValue = queue => {
return pair.value;
};

exports.EnqueueValueWithSize = (queue, value, size) => {
exports.EnqueueValueWithSize = (queue, value, size, targetRealm) => {
size = Number(size);
if (!IsFiniteNonNegativeNumber(size)) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}

if (targetRealm !== undefined) {
value = StructuredClone(value/* , targetRealm*/);
}

queue.push({ value, size });

if (queue._totalSize === undefined) {
Expand Down
60 changes: 56 additions & 4 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';
const assert = require('assert');
/* structured clone is impossible to truly polyfill, but closest match */
const StructuredClone = require('realistic-structured-clone');
const { ArrayBufferCopy, CreateIterResultObject, IsFiniteNonNegativeNumber, InvokeOrNoop, PromiseInvokeOrNoop,
SameRealmTransfer, ValidateAndNormalizeQueuingStrategy, ValidateAndNormalizeHighWaterMark } =
require('./helpers.js');
Expand All @@ -12,6 +14,7 @@ const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLo

const InternalCancel = Symbol('[[Cancel]]');
const InternalPull = Symbol('[[Pull]]');
const InternalTransfer = Symbol('[[Transfer]]');

class ReadableStream {
constructor(underlyingSource = {}, { size, highWaterMark } = {}) {
Expand All @@ -21,6 +24,7 @@ class ReadableStream {
this._reader = undefined;
this._storedError = undefined;

this._Detached = false;
this._disturbed = false;

// Initialize to undefined first because the constructor of the controller checks this
Expand All @@ -33,11 +37,13 @@ class ReadableStream {
highWaterMark = 0;
}
this._readableStreamController = new ReadableByteStreamController(this, underlyingSource, highWaterMark);
} else if (type === undefined) {
} else if (type === undefined || type === 'cloning') {
if (highWaterMark === undefined) {
highWaterMark = 1;
}
this._readableStreamController = new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark);

this._readableStreamController =
new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark, type);
} else {
throw new RangeError('Invalid type is specified');
}
Expand Down Expand Up @@ -251,6 +257,33 @@ class ReadableStream {
const branches = ReadableStreamTee(this, false);
return createArrayFromList(branches);
}

[InternalTransfer](targetRealm) {
if (IsReadableStreamLocked(this) === true) {
throw new TypeError('Cannot transfer a locked stream');
}
if (this._state === 'errored') {
throw new TypeError('Cannot transfer an errored stream');
}
const controller = this._readableStreamController;
if (controller._targetRealm === undefined) {
throw new TypeError('Only cloning streams are transferable');
}
/* at least approximate realm-transfer */
const that = new targetRealm.ReadableStream();
that._state = this._state;
that._disturbed = this._disturbed;

controller._controlledReadableStream = that;
that._readableStreamController = controller;
for (const pair of controller._queue) {
pair.value = StructuredClone(pair.value/* , targetRealm*/);
}
controller._targetRealm = targetRealm;
this._Detached = true;

return that;
}
}

module.exports = {
Expand Down Expand Up @@ -293,6 +326,9 @@ function IsReadableStreamDisturbed(stream) {
function IsReadableStreamLocked(stream) {
assert(IsReadableStream(stream) === true, 'IsReadableStreamLocked should only be used on known readable streams');

if (stream._Detached === true) {
return true;
}
if (stream._reader === undefined) {
return false;
}
Expand Down Expand Up @@ -469,6 +505,8 @@ function ReadableStreamAddReadRequest(stream) {
}

function ReadableStreamCancel(stream, reason) {
assert(stream._Detached === false);

stream._disturbed = true;

if (stream._state === 'closed') {
Expand Down Expand Up @@ -839,7 +877,7 @@ function ReadableStreamDefaultReaderRead(reader) {
// Controllers

class ReadableStreamDefaultController {
constructor(stream, underlyingSource, size, highWaterMark) {
constructor(stream, underlyingSource, size, highWaterMark, type) {
if (IsReadableStream(stream) === false) {
throw new TypeError('ReadableStreamDefaultController can only be constructed with a ReadableStream instance');
}
Expand All @@ -858,6 +896,12 @@ class ReadableStreamDefaultController {
this._closeRequested = false;
this._pullAgain = false;
this._pulling = false;
this._targetRealm = undefined;

if (type === 'cloning') {
/* can't access self/window/worker settings object from inside node module */
this._targetRealm = global; // set this.[[targetRealm]] to current Realm Record
}

const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
this._strategySize = normalizedStrategy.size;
Expand Down Expand Up @@ -1062,6 +1106,14 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
assert(stream._state === 'readable');

if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
if (controller._targetRealm !== undefined) {
try {
chunk = StructuredClone(chunk/* , controller._targetRealm */);
} catch (cloneE) {
ReadableStreamDefaultControllerErrorIfNeeded(controller, cloneE);
throw cloneE;
}
}
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize = 1;
Expand All @@ -1076,7 +1128,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
}

try {
EnqueueValueWithSize(controller._queue, chunk, chunkSize);
EnqueueValueWithSize(controller._queue, chunk, chunkSize, controller._targetRealm);
} catch (enqueueE) {
ReadableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);
throw enqueueE;
Expand Down
3 changes: 3 additions & 0 deletions reference-implementation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
"Takeshi Yoshino <[email protected]>"
],
"license": "(CC0-1.0 OR MIT)",
"dependencies": {
"realistic-structured-clone": "^0.0.3"
},
"devDependencies": {
"eslint": "^3.2.2",
"glob": "^7.0.3",
Expand Down