Skip to content

Commit

Permalink
quic: extensive refactoring of QuicStream lifecycle
Browse files Browse the repository at this point in the history
This one was a bit of a rabbit hole... but, with this set of
changes, `QuicStream` should now work with autoDestroy, supports
a promisified `close()`, and fixes a number of other internal
bugs that were spotted trying to get it to work.

PR-URL: #34351
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
  • Loading branch information
jasnell committed Jul 23, 2020
1 parent cf28f8a commit 086c916
Show file tree
Hide file tree
Showing 24 changed files with 1,025 additions and 747 deletions.
8 changes: 5 additions & 3 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,12 @@ The `openStream()` method is used to create a new `QuicStream`:

```js
// Create a new bidirectional stream
const stream1 = await session.openStream();
async function createStreams(session) {
const stream1 = await session.openStream();

// Create a new unidirectional stream
const stream2 = await session.openStream({ halfOpen: true });
// Create a new unidirectional stream
const stream2 = await session.openStream({ halfOpen: true });
}
```

As suggested by the names, a bidirectional stream allows data to be sent on
Expand Down
195 changes: 114 additions & 81 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const {
validateQuicEndpointOptions,
validateCreateSecureContextOptions,
validateQuicSocketConnectOptions,
QuicStreamSharedState,
QuicSocketSharedState,
QuicSessionSharedState,
QLogStream,
Expand Down Expand Up @@ -1792,8 +1793,7 @@ class QuicSession extends EventEmitter {
const stream = this[kInternalState].streams.get(id);
if (stream === undefined)
return;

stream.destroy();
stream[kDestroy](code);
}

[kStreamReset](id, code) {
Expand Down Expand Up @@ -1968,6 +1968,8 @@ class QuicSession extends EventEmitter {
return;
state.destroyed = true;

state.idleTimeout = Boolean(this[kInternalState].state?.idleTimeout);

// Destroy any remaining streams immediately.
for (const stream of state.streams.values())
stream.destroy(error);
Expand All @@ -1982,7 +1984,6 @@ class QuicSession extends EventEmitter {
handle.stats[IDX_QUIC_SESSION_STATS_DESTROYED_AT] =
process.hrtime.bigint();
state.stats = new BigInt64Array(handle.stats);
state.idleTimeout = this[kInternalState].state.idleTimeout;

// Destroy the underlying QuicSession handle
handle.destroy(state.closeCode, state.closeFamily);
Expand Down Expand Up @@ -2530,10 +2531,12 @@ function streamOnPause() {
if (!this.destroyed)
this[kHandle].readStop();
}

class QuicStream extends Duplex {
[kInternalState] = {
closed: false,
closePromise: undefined,
closePromiseReject: undefined,
closePromiseResolve: undefined,
defaultEncoding: undefined,
didRead: false,
id: undefined,
Expand All @@ -2544,6 +2547,7 @@ class QuicStream extends Duplex {
dataRateHistogram: undefined,
dataSizeHistogram: undefined,
dataAckHistogram: undefined,
sharedState: undefined,
stats: undefined,
};

Expand All @@ -2563,7 +2567,7 @@ class QuicStream extends Duplex {
allowHalfOpen: true,
decodeStrings: true,
emitClose: true,
autoDestroy: false,
autoDestroy: true,
captureRejections: true,
});
const state = this[kInternalState];
Expand All @@ -2584,7 +2588,6 @@ class QuicStream extends Duplex {
// is still minimally usable before this but any data
// written will be buffered until kSetHandle is called.
[kSetHandle](handle) {
this[kHandle] = handle;
const state = this[kInternalState];
if (handle !== undefined) {
handle.onread = onStreamRead;
Expand All @@ -2594,23 +2597,84 @@ class QuicStream extends Duplex {
state.dataRateHistogram = new Histogram(handle.rate);
state.dataSizeHistogram = new Histogram(handle.size);
state.dataAckHistogram = new Histogram(handle.ack);
state.sharedState = new QuicStreamSharedState(handle.state);
state.session[kAddStream](state.id, this);
} else {
if (this[kHandle] !== undefined) {
this[kHandle].stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] =
process.hrtime.bigint();
state.stats = new BigInt64Array(this[kHandle].stats);
}
state.sharedState = undefined;
if (state.dataRateHistogram)
state.dataRateHistogram[kDestroyHistogram]();
if (state.dataSizeHistogram)
state.dataSizeHistogram[kDestroyHistogram]();
if (state.dataAckHistogram)
state.dataAckHistogram[kDestroyHistogram]();
}
this[kHandle] = handle;
}

[kStreamReset](code) {
this[kInternalState].resetCode = code | 0;
// Receiving a reset from the peer indicates that it is no
// longer sending any data, we can safely close the readable
// side of the Duplex here.
this[kInternalState].resetCode = code;
this.push(null);
this.read();
}

[kClose]() {
const state = this[kInternalState];

if (this.destroyed) {
return PromiseReject(
new ERR_INVALID_STATE('QuicStream is already destroyed'));
}

const promise = deferredClosePromise(state);
if (this.readable) {
this.push(null);
this.read();
}

if (this.writable) {
this.end();
}

return promise;
}

close() {
return this[kInternalState].closePromise || this[kClose]();
}

_destroy(error, callback) {
const state = this[kInternalState];
const handle = this[kHandle];
this[kSetHandle]();
if (handle !== undefined)
handle.destroy();
state.session[kRemoveStream](this);

if (error && typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();

process.nextTick(() => callback(error));
}

[kDestroy](code) {
// TODO(@jasnell): If code is non-zero, and stream is not otherwise
// naturally shutdown, then we should destroy with an error.

// Put the QuicStream into detached mode before calling destroy
this[kSetHandle]();
this.destroy();
}

[kHeaders](headers, kind, push_id) {
// TODO(@jasnell): Convert the headers into a proper object
let name;
Expand All @@ -2635,42 +2699,6 @@ class QuicStream extends Duplex {
process.nextTick(emit.bind(this, name, headers, push_id));
}

[kClose](family, code) {
const state = this[kInternalState];
// Trigger the abrupt shutdown of the stream. If the stream is
// already no-longer readable or writable, this does nothing. If
// the stream is readable or writable, then the abort event will
// be emitted immediately after triggering the send of the
// RESET_STREAM and STOP_SENDING frames. The stream will no longer
// be readable or writable, but will not be immediately destroyed
// as we need to wait until ngtcp2 recognizes the stream as
// having been closed to be destroyed.

// Do nothing if we've already been destroyed
if (this.destroyed || state.closed)
return;

state.closed = true;

// Trigger scheduling of the RESET_STREAM and STOP_SENDING frames
// as appropriate. Notify ngtcp2 that the stream is to be shutdown.
// Once sent, the stream will be closed and destroyed as soon as
// the shutdown is acknowledged by the peer.
this[kHandle].resetStream(code, family);

// Close down the readable side of the stream
if (this.readable) {
this.push(null);
this.read();
}

// It is important to call shutdown on the handle before shutting
// down the writable side of the stream in order to prevent an
// empty STREAM frame with fin set to be sent to the peer.
if (this.writable)
this.end();
}

[kAfterAsyncWrite]({ bytes }) {
// TODO(@jasnell): Implement this
}
Expand All @@ -2681,6 +2709,7 @@ class QuicStream extends Duplex {
const initiated = this.serverInitiated ? 'server' : 'client';
return customInspect(this, {
id: this[kInternalState].id,
detached: this.detached,
direction,
initiated,
writableState: this._writableState,
Expand All @@ -2699,6 +2728,15 @@ class QuicStream extends Duplex {
// TODO(@jasnell): Implement this later
}

get detached() {
// The QuicStream is detached if it is yet destroyed
// but the underlying handle is undefined. While in
// detached mode, the QuicStream may still have
// data pending in the read queue, but writes will
// not be permitted.
return this[kHandle] === undefined;
}

get serverInitiated() {
return !!(this[kInternalState].id & 0b01);
}
Expand Down Expand Up @@ -2740,20 +2778,40 @@ class QuicStream extends Duplex {
// called. By calling shutdown, we're telling
// the native side that no more data will be
// coming so that a fin stream packet can be
// sent.
// sent, allowing any remaining final stream
// frames to be sent if necessary.
//
// When end() is called, we set the writeEnded
// flag so that we can know earlier when there
// is not going to be any more data being written
// but that is only used when end() is called
// with a final chunk to write.
_final(cb) {
const handle = this[kHandle];
if (handle === undefined) {
cb();
if (!this.detached) {
const state = this[kInternalState];
if (state.sharedState?.finSent)
return cb();
const handle = this[kHandle];
const req = new ShutdownWrap();
req.oncomplete = () => {
req.handle = undefined;
cb();
};
req.handle = handle;
if (handle.shutdown(req) === 1)
return req.oncomplete();
return;
}
return cb();
}

const req = new ShutdownWrap();
req.oncomplete = () => cb();
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1)
return cb();
end(...args) {
if (!this.destroyed) {
if (!this.detached)
this[kInternalState].sharedState.writeEnded = true;
super.end.apply(this, args);
}
return this;
}

_read(nread) {
Expand Down Expand Up @@ -2809,11 +2867,6 @@ class QuicStream extends Duplex {
this[kUpdateTimer]();
this.ownsFd = ownsFd;

// Close the writable side of the stream, but only as far as the writable
// stream implementation is concerned.
this._final = null;
this.end();

defaultTriggerAsyncIdScope(this[async_id_symbol],
QuicStream[kStartFilePipe],
this, fd, offset, length);
Expand All @@ -2840,6 +2893,7 @@ class QuicStream extends Duplex {
this.source.close().catch(stream.destroy.bind(stream));
else
this.source.releaseFD();
stream.end();
}

static [kOnPipedFileHandleRead]() {
Expand Down Expand Up @@ -2869,35 +2923,14 @@ class QuicStream extends Duplex {
return this[kInternalState].push_id;
}

close(code) {
this[kClose](QUIC_ERROR_APPLICATION, code);
_onTimeout() {
// TODO(@jasnell): Implement this
}

get session() {
return this[kInternalState].session;
}

_destroy(error, callback) {
const state = this[kInternalState];
const handle = this[kHandle];
// Do not use handle after this point as the underlying C++
// object has been destroyed. Any attempt to use the object
// will segfault and crash the process.
if (handle !== undefined) {
handle.stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] =
process.hrtime.bigint();
state.stats = new BigInt64Array(handle.stats);
handle.destroy();
}
state.session[kRemoveStream](this);
// The destroy callback must be invoked in a nextTick
process.nextTick(() => callback(error));
}

_onTimeout() {
// TODO(@jasnell): Implement this
}

get dataRateHistogram() {
return this[kInternalState].dataRateHistogram;
}
Expand Down
Loading

0 comments on commit 086c916

Please sign in to comment.