Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quic: various additional cleanups, fixes in Endpoint #51310

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@
'src/quic/cid.cc',
'src/quic/data.cc',
'src/quic/endpoint.cc',
'src/quic/http3.cc',
'src/quic/logstream.cc',
'src/quic/packet.cc',
'src/quic/preferredaddress.cc',
Expand All @@ -368,6 +369,7 @@
'src/quic/cid.h',
'src/quic/data.h',
'src/quic/endpoint.h',
'src/quic/http3.h',
'src/quic/logstream.h',
'src/quic/packet.h',
'src/quic/preferredaddress.h',
Expand Down
3 changes: 2 additions & 1 deletion src/debug_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ void NODE_EXTERN_PRIVATE FWrite(FILE* file, const std::string& str);
V(WASI) \
V(MKSNAPSHOT) \
V(SNAPSHOT_SERDES) \
V(PERMISSION_MODEL)
V(PERMISSION_MODEL) \
V(QUIC)

enum class DebugCategory : unsigned int {
#define V(name) name,
Expand Down
120 changes: 91 additions & 29 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC

#include "application.h"
#include <async_wrap-inl.h>
#include <debug_utils-inl.h>
#include <node_bob.h>
#include <node_sockaddr-inl.h>
#include <uv.h>
#include <v8.h>
#include "defs.h"
#include "endpoint.h"
#include "http3.h"
#include "packet.h"
#include "session.h"

Expand All @@ -21,24 +24,48 @@ using v8::Value;

namespace quic {

struct Session::Application::StreamData final {
// The actual number of vectors in the struct, up to kMaxVectorCount.
size_t count = 0;
size_t remaining = 0;
// The stream identifier. If this is a negative value then no stream is
// identified.
int64_t id = -1;
int fin = 0;
ngtcp2_vec data[kMaxVectorCount]{};
ngtcp2_vec* buf = data;
BaseObjectPtr<Stream> stream;
};

// ============================================================================
// Session::Application_Options
const Session::Application_Options Session::Application_Options::kDefault = {};

Session::Application_Options::operator const nghttp3_settings() const {
// In theory, Application_Options might contain options for more than just
// HTTP/3. Here we extract only the properties that are relevant to HTTP/3.
return nghttp3_settings{
max_field_section_size,
static_cast<size_t>(qpack_max_dtable_capacity),
static_cast<size_t>(qpack_encoder_max_dtable_capacity),
static_cast<size_t>(qpack_blocked_streams),
enable_connect_protocol,
enable_datagrams,
};
}

std::string Session::Application_Options::ToString() const {
DebugIndentScope indent;
auto prefix = indent.Prefix();
std::string res("{");
res += prefix + "max header pairs: " + std::to_string(max_header_pairs);
res += prefix + "max header length: " + std::to_string(max_header_length);
res += prefix +
"max field section size: " + std::to_string(max_field_section_size);
res += prefix + "qpack max dtable capacity: " +
std::to_string(qpack_max_dtable_capacity);
res += prefix + "qpack encoder max dtable capacity: " +
std::to_string(qpack_encoder_max_dtable_capacity);
res += prefix +
"qpack blocked streams: " + std::to_string(qpack_blocked_streams);
res += prefix + "enable connect protocol: " +
(enable_connect_protocol ? std::string("yes") : std::string("no"));
res += prefix + "enable datagrams: " +
(enable_datagrams ? std::string("yes") : std::string("no"));
res += indent.Close();
return res;
}

Maybe<Session::Application_Options> Session::Application_Options::From(
Environment* env, Local<Value> value) {
if (value.IsEmpty()) {
if (value.IsEmpty() || (!value->IsUndefined() && !value->IsObject())) {
THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object");
return Nothing<Application_Options>();
}
Expand All @@ -49,11 +76,6 @@ Maybe<Session::Application_Options> Session::Application_Options::From(
return Just<Application_Options>(options);
}

if (!value->IsObject()) {
THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object");
return Nothing<Application_Options>();
}

auto params = value.As<Object>();

#define SET(name) \
Expand All @@ -63,7 +85,8 @@ Maybe<Session::Application_Options> Session::Application_Options::From(

if (!SET(max_header_pairs) || !SET(max_header_length) ||
!SET(max_field_section_size) || !SET(qpack_max_dtable_capacity) ||
!SET(qpack_encoder_max_dtable_capacity) || !SET(qpack_blocked_streams)) {
!SET(qpack_encoder_max_dtable_capacity) || !SET(qpack_blocked_streams) ||
!SET(enable_connect_protocol) || !SET(enable_datagrams)) {
return Nothing<Application_Options>();
}

Expand All @@ -78,16 +101,22 @@ Session::Application::Application(Session* session, const Options& options)
bool Session::Application::Start() {
// By default there is nothing to do. Specific implementations may
// override to perform more actions.
Debug(session_, "Session application started");
return true;
}

void Session::Application::AcknowledgeStreamData(Stream* stream,
size_t datalen) {
Debug(session_,
"Application acknowledging stream %" PRIi64 " data: %zu",
stream->id(),
datalen);
DCHECK_NOT_NULL(stream);
stream->Acknowledge(datalen);
}

void Session::Application::BlockStream(int64_t id) {
Debug(session_, "Application blocking stream %" PRIi64, id);
auto stream = session().FindStream(id);
if (stream) stream->EmitBlocked();
}
Expand All @@ -96,6 +125,7 @@ bool Session::Application::CanAddHeader(size_t current_count,
size_t current_headers_length,
size_t this_header_length) {
// By default headers are not supported.
Debug(session_, "Application cannot add header");
return false;
}

Expand All @@ -104,33 +134,39 @@ bool Session::Application::SendHeaders(const Stream& stream,
const v8::Local<v8::Array>& headers,
HeadersFlags flags) {
// By default do nothing.
Debug(session_, "Application cannot send headers");
return false;
}

void Session::Application::ResumeStream(int64_t id) {
Debug(session_, "Application resuming stream %" PRIi64, id);
// By default do nothing.
}

void Session::Application::ExtendMaxStreams(EndpointLabel label,
Direction direction,
uint64_t max_streams) {
Debug(session_, "Application extending max streams");
// By default do nothing.
}

void Session::Application::ExtendMaxStreamData(Stream* stream,
uint64_t max_data) {
Debug(session_, "Application extending max stream data");
// By default do nothing.
}

void Session::Application::CollectSessionTicketAppData(
SessionTicket::AppData* app_data) const {
Debug(session_, "Application collecting session ticket app data");
// By default do nothing.
}

SessionTicket::AppData::Status
Session::Application::ExtractSessionTicketAppData(
const SessionTicket::AppData& app_data,
SessionTicket::AppData::Source::Flag flag) {
Debug(session_, "Application extracting session ticket app data");
// By default we do not have any application data to retrieve.
return flag == SessionTicket::AppData::Source::Flag::STATUS_RENEW
? SessionTicket::AppData::Status::TICKET_USE_RENEW
Expand All @@ -140,14 +176,16 @@ Session::Application::ExtractSessionTicketAppData(
void Session::Application::SetStreamPriority(const Stream& stream,
StreamPriority priority,
StreamPriorityFlags flags) {
Debug(
session_, "Application setting stream %" PRIi64 " priority", stream.id());
// By default do nothing.
}

StreamPriority Session::Application::GetStreamPriority(const Stream& stream) {
return StreamPriority::DEFAULT;
}

BaseObjectPtr<Packet> Session::Application::CreateStreamDataPacket() {
Packet* Session::Application::CreateStreamDataPacket() {
return Packet::Create(env(),
session_->endpoint_.get(),
session_->remote_address_,
Expand All @@ -156,24 +194,37 @@ BaseObjectPtr<Packet> Session::Application::CreateStreamDataPacket() {
}

void Session::Application::StreamClose(Stream* stream, QuicError error) {
Debug(session_,
"Application closing stream %" PRIi64 " with error %s",
stream->id(),
error);
stream->Destroy(error);
}

void Session::Application::StreamStopSending(Stream* stream, QuicError error) {
Debug(session_,
"Application stopping sending on stream %" PRIi64 " with error %s",
stream->id(),
error);
DCHECK_NOT_NULL(stream);
stream->ReceiveStopSending(error);
}

void Session::Application::StreamReset(Stream* stream,
uint64_t final_size,
QuicError error) {
Debug(session_,
"Application resetting stream %" PRIi64 " with error %s",
stream->id(),
error);
stream->ReceiveStreamReset(final_size, error);
}

void Session::Application::SendPendingData() {
Debug(session_, "Application sending pending data");
PathStorage path;

BaseObjectPtr<Packet> packet;
Packet* packet = nullptr;
uint8_t* pos = nullptr;
int err = 0;

Expand All @@ -182,6 +233,7 @@ void Session::Application::SendPendingData() {
size_t packetSendCount = 0;

const auto updateTimer = [&] {
Debug(session_, "Application updating the session timer");
ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime());
session_->UpdateTimer();
};
Expand Down Expand Up @@ -209,9 +261,9 @@ void Session::Application::SendPendingData() {
return session_->Close(Session::CloseMethod::SILENT);
}

if (!packet) {
if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (!packet) {
if (packet == nullptr) {
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}
Expand Down Expand Up @@ -319,12 +371,14 @@ class DefaultApplication final : public Session::Application {
const uint8_t* data,
size_t datalen,
Stream::ReceiveDataFlags flags) override {
Debug(&session(), "Default application receiving stream data");
DCHECK_NOT_NULL(stream);
if (!stream->is_destroyed()) stream->ReceiveData(data, datalen, flags);
return true;
}

int GetStreamData(StreamData* stream_data) override {
Debug(&session(), "Default application getting stream data");
DCHECK_NOT_NULL(stream_data);
// If the queue is empty, there aren't any streams with data yet
if (stream_queue_.IsEmpty()) return 0;
Expand Down Expand Up @@ -380,7 +434,10 @@ class DefaultApplication final : public Session::Application {
return 0;
}

void ResumeStream(int64_t id) override { ScheduleStream(id); }
void ResumeStream(int64_t id) override {
Debug(&session(), "Default application resuming stream %" PRIi64, id);
ScheduleStream(id);
}

bool ShouldSetFin(const StreamData& stream_data) override {
auto const is_empty = [](auto vec, size_t cnt) {
Expand All @@ -394,6 +451,7 @@ class DefaultApplication final : public Session::Application {
}

bool StreamCommit(StreamData* stream_data, size_t datalen) override {
Debug(&session(), "Default application committing stream data");
DCHECK_NOT_NULL(stream_data);
const auto consume = [](ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
ngtcp2_vec* v = *pvec;
Expand Down Expand Up @@ -425,13 +483,15 @@ class DefaultApplication final : public Session::Application {

private:
void ScheduleStream(int64_t id) {
Debug(&session(), "Default application scheduling stream %" PRIi64, id);
auto stream = session().FindStream(id);
if (stream && !stream->is_destroyed()) {
stream->Schedule(&stream_queue_);
}
}

void UnscheduleStream(int64_t id) {
Debug(&session(), "Default application unscheduling stream %" PRIi64, id);
auto stream = session().FindStream(id);
if (stream && !stream->is_destroyed()) stream->Unschedule();
}
Expand All @@ -440,13 +500,15 @@ class DefaultApplication final : public Session::Application {
};

std::unique_ptr<Session::Application> Session::select_application() {
// if (config.options.crypto_options.alpn == NGHTTP3_ALPN_H3)
// return std::make_unique<Http3>(session,
// config.options.application_options);

// In the future, we may end up supporting additional QUIC protocols. As they
// are added, extend the cases here to create and return them.

if (config_.options.tls_options.alpn == NGHTTP3_ALPN_H3) {
Debug(this, "Selecting HTTP/3 application");
return createHttp3Application(this, config_.options.application_options);
}

Debug(this, "Selecting default application");
return std::make_unique<DefaultApplication>(
this, config_.options.application_options);
}
Expand Down
18 changes: 17 additions & 1 deletion src/quic/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ class Session::Application : public MemoryRetainer {
protected:
inline Environment* env() const { return session_->env(); }
inline Session& session() { return *session_; }
inline const Session& session() const { return *session_; }

BaseObjectPtr<Packet> CreateStreamDataPacket();
Packet* CreateStreamDataPacket();

struct StreamData;

Expand All @@ -137,6 +138,21 @@ class Session::Application : public MemoryRetainer {
Session* session_;
};

struct Session::Application::StreamData final {
// The actual number of vectors in the struct, up to kMaxVectorCount.
size_t count = 0;
size_t remaining = 0;
// The stream identifier. If this is a negative value then no stream is
// identified.
int64_t id = -1;
int fin = 0;
ngtcp2_vec data[kMaxVectorCount]{};
ngtcp2_vec* buf = data;
BaseObjectPtr<Stream> stream;

inline operator nghttp3_vec() const { return {data[0].base, data[0].len}; }
};

} // namespace quic
} // namespace node

Expand Down
Loading