Skip to content

Commit

Permalink
quic: continued refactoring for quic_stream/quic_session
Browse files Browse the repository at this point in the history
PR-URL: #34160
Reviewed-By: Anna Henningsen <[email protected]>
  • Loading branch information
jasnell committed Jul 5, 2020
1 parent 56dbe46 commit b1750a4
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 51 deletions.
5 changes: 4 additions & 1 deletion src/quic/node_quic_default_application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ bool DefaultApplication::ReceiveStreamData(
if (!stream) {
// Shutdown the stream explicitly if the session is being closed.
if (session()->is_gracefully_closing()) {
session()->ResetStream(stream_id, NGTCP2_ERR_CLOSING);
ngtcp2_conn_shutdown_stream(
session()->connection(),
stream_id,
NGTCP2_ERR_CLOSING);
return true;
}

Expand Down
5 changes: 4 additions & 1 deletion src/quic/node_quic_http3_application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,10 @@ void Http3Application::PushStream(
void Http3Application::SendStopSending(
int64_t stream_id,
uint64_t app_error_code) {
session()->ResetStream(stream_id, app_error_code);
ngtcp2_conn_shutdown_stream_read(
session()->connection(),
stream_id,
app_error_code);
}

void Http3Application::EndStream(int64_t stream_id) {
Expand Down
5 changes: 0 additions & 5 deletions src/quic/node_quic_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2328,11 +2328,6 @@ void QuicSession::ResumeStream(int64_t stream_id) {
application()->ResumeStream(stream_id);
}

void QuicSession::ResetStream(int64_t stream_id, uint64_t code) {
SendSessionScope scope(this);
CHECK_EQ(ngtcp2_conn_shutdown_stream(connection(), stream_id, code), 0);
}

// Silent Close must start with the JavaScript side, which must
// clean up state, abort any still existing QuicSessions, then
// destroy the handle when done. The most important characteristic
Expand Down
31 changes: 0 additions & 31 deletions src/quic/node_quic_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -957,37 +957,6 @@ class QuicSession : public AsyncWrap,

const StreamsMap& streams() const { return streams_; }

// ResetStream will cause ngtcp2 to queue a
// RESET_STREAM and STOP_SENDING frame, as appropriate,
// for the given stream_id. For a locally-initiated
// unidirectional stream, only a RESET_STREAM frame
// will be scheduled and the stream will be immediately
// closed. For a bi-directional stream, a STOP_SENDING
// frame will be sent.
//
// It is important to note that the QuicStream is
// not destroyed immediately following ShutdownStream.
// The sending QuicSession will not close the stream
// until the RESET_STREAM is acknowledged.
//
// Once the RESET_STREAM is sent, the QuicSession
// should not send any new frames for the stream,
// and all inbound stream frames should be discarded.
// Once ngtcp2 receives the appropriate notification
// that the RESET_STREAM has been acknowledged, the
// stream will be closed.
//
// Once the stream has been closed, it will be
// destroyed and memory will be freed. User code
// can request that a stream be immediately and
// abruptly destroyed without calling ShutdownStream.
// Likewise, an idle timeout may cause the stream
// to be silently destroyed without calling
// ShutdownStream.
void ResetStream(
int64_t stream_id,
uint64_t error_code = NGTCP2_APP_NOERROR);

void ResumeStream(int64_t stream_id);

// Submits informational headers to the QUIC Application
Expand Down
28 changes: 21 additions & 7 deletions src/quic/node_quic_stream-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,33 @@ void QuicStream::Commit(size_t amount) {
streambuf_.Seek(amount);
}

// ResetStream will cause ngtcp2 to queue a RESET_STREAM and STOP_SENDING
// frame, as appropriate, for the given stream_id. For a locally-initiated
// unidirectional stream, only a RESET_STREAM frame will be scheduled and
// the stream will be immediately closed. For a bidirectional stream, a
// STOP_SENDING frame will be sent.
void QuicStream::ResetStream(uint64_t app_error_code) {
// On calling shutdown, the stream will no longer be
// readable or writable, all any pending data in the
// streambuf_ will be canceled, and all data pending
// to be acknowledged at the ngtcp2 level will be
// abandoned.
BaseObjectPtr<QuicSession> ptr(session_);
QuicSession::SendSessionScope send_scope(session());
ngtcp2_conn_shutdown_stream(
session()->connection(),
stream_id_,
app_error_code);
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
session_->ResetStream(stream_id_, app_error_code);
streambuf_.Cancel();
streambuf_.End();
}

// StopSending will cause ngtcp2 to queue a STOP_SENDING frame if the
// stream is still inbound readable.
void QuicStream::StopSending(uint64_t app_error_code) {
QuicSession::SendSessionScope send_scope(session());
ngtcp2_conn_shutdown_stream_read(
session()->connection(),
stream_id_,
app_error_code);
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
}

void QuicStream::Schedule(Queue* queue) {
if (!stream_queue_.IsEmpty()) // Already scheduled?
return;
Expand Down
31 changes: 28 additions & 3 deletions src/quic/node_quic_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,31 @@ std::string QuicStream::diagnostic_name() const {
", " + session_->diagnostic_name() + ")";
}

void QuicStream::Destroy() {
void QuicStream::Destroy(QuicError* error) {
if (is_destroyed())
return;

QuicSession::SendSessionScope send_scope(session());

set_flag(QUICSTREAM_FLAG_DESTROYED);
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
streambuf_.End();

// In case this stream is scheduled for sending, remove it
// from the schedule queue
Unschedule();

// If there is data currently buffered in the streambuf_,
// then cancel will call out to invoke an arbitrary
// JavaScript callback (the on write callback). Within
// that callback, however, the QuicStream will no longer
// be usable to send or receive data.
streambuf_.End();
streambuf_.Cancel();
CHECK_EQ(streambuf_.length(), 0);

// Attempt to send a shutdown signal to the remote peer
ResetStream(error != nullptr ? error->code : NGTCP2_NO_ERROR);

// The QuicSession maintains a map of std::unique_ptrs to
// QuicStream instances. Removing this here will cause
// this QuicStream object to be deconstructed, so the
Expand Down Expand Up @@ -411,9 +421,11 @@ void OpenBidirectionalStream(const FunctionCallbackInfo<Value>& args) {
}

void QuicStreamDestroy(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
stream->Destroy();
QuicError error(env, args[0], args[1], QUIC_ERROR_APPLICATION);
stream->Destroy(&error);
}

void QuicStreamReset(const FunctionCallbackInfo<Value>& args) {
Expand All @@ -428,6 +440,18 @@ void QuicStreamReset(const FunctionCallbackInfo<Value>& args) {
error.code : static_cast<uint64_t>(NGTCP2_NO_ERROR));
}

void QuicStreamStopSending(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());

QuicError error(env, args[0], args[1], QUIC_ERROR_APPLICATION);

stream->StopSending(
error.family == QUIC_ERROR_APPLICATION ?
error.code : static_cast<uint64_t>(NGTCP2_NO_ERROR));
}

// Requests transmission of a block of informational headers. Not all
// QUIC Applications will support headers. If headers are not supported,
// This will set the return value to false, otherwise the return value
Expand Down Expand Up @@ -494,6 +518,7 @@ void QuicStream::Initialize(
streamt->Set(env->owner_symbol(), Null(env->isolate()));
env->SetProtoMethod(stream, "destroy", QuicStreamDestroy);
env->SetProtoMethod(stream, "resetStream", QuicStreamReset);
env->SetProtoMethod(stream, "stopSending", QuicStreamStopSending);
env->SetProtoMethod(stream, "id", QuicStreamGetID);
env->SetProtoMethod(stream, "submitInformation", QuicStreamSubmitInformation);
env->SetProtoMethod(stream, "submitHeaders", QuicStreamSubmitHeaders);
Expand Down
6 changes: 4 additions & 2 deletions src/quic/node_quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class QuicStream : public AsyncWrap,
void Acknowledge(uint64_t offset, size_t datalen);

// Destroy the QuicStream and render it no longer usable.
void Destroy();
void Destroy(QuicError* error = nullptr);

// Buffers chunks of data to be written to the QUIC connection.
int DoWrite(
Expand Down Expand Up @@ -312,7 +312,9 @@ class QuicStream : public AsyncWrap,

// Resets the QUIC stream, sending a signal to the peer that
// no additional data will be transmitted for this stream.
inline void ResetStream(uint64_t app_error_code = 0);
inline void ResetStream(uint64_t app_error_code = NGTCP2_NO_ERROR);

inline void StopSending(uint64_t app_error_code = NGTCP2_NO_ERROR);

// Submits informational headers. Returns false if headers are not
// supported on the underlying QuicApplication.
Expand Down
3 changes: 2 additions & 1 deletion test/parallel/test-quic-statelessreset.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ server.on('session', common.mustCall((session) => {

server.on('close', common.mustCall(() => {
// Verify stats recording
assert.strictEqual(server.statelessResetCount, 1n);
console.log(server.statelessResetCount);
assert(server.statelessResetCount >= 1n);
}));

server.on('ready', common.mustCall(() => {
Expand Down

0 comments on commit b1750a4

Please sign in to comment.