Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Priority Support #1778

Merged
merged 6 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions scripts/update-sidecar.ps1
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
<#

.SYNOPSIS
This updates/regenerates the CLOG sidecar file.

.PARAMETER Clean
Deletes the old sidecar file first.
This regenerates the CLOG sidecar file.

#>

param (
[Parameter(Mandatory = $false)]
[switch]$Clean = $false
)

Set-StrictMode -Version 'Latest'
$PSDefaultParameterValues['*:ErrorAction'] = 'Stop'

Expand All @@ -35,9 +27,7 @@ $ConfigFile = Join-Path $SrcDir "manifest" "msquic.clog_config"
$OutputDir = Join-Path $RootDir "build" "tmp"
New-Item -Path $OutputDir -ItemType Directory -Force | Out-Null

if ($Clean) {
Remove-Item $Sidecar -Force -ErrorAction Ignore | Out-Null
}
Remove-Item $Sidecar -Force -ErrorAction Ignore | Out-Null

foreach ($File in $Files) {
clog -p windows --scopePrefix "QUIC" -s $Sidecar -c $ConfigFile -i $File --outputDirectory "$OutputDir"
Expand Down
1 change: 0 additions & 1 deletion src/core/crypto.c
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,6 @@ QuicCryptoCustomCertValidationComplete(
_In_ BOOLEAN Result
)
{
CXPLAT_TEL_ASSERT(Crypto->CertValidationPending);
if (!Crypto->CertValidationPending) {
return;
}
Expand Down
58 changes: 54 additions & 4 deletions src/core/send.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,19 @@ QuicSendQueueFlushForStream(
// Not previously queued, so add the stream to the end of the queue.
//
CXPLAT_DBG_ASSERT(Stream->SendLink.Flink == NULL);
CxPlatListInsertTail(&Send->SendStreams, &Stream->SendLink);
CXPLAT_LIST_ENTRY* Entry = Send->SendStreams.Blink;
while (Entry != &Send->SendStreams) {
//
// Search back to front for the right place (based on priority) to
// insert the stream.
//
if (Stream->SendPriority <=
CXPLAT_CONTAINING_RECORD(Entry, QUIC_STREAM, SendLink)->SendPriority) {
break;
}
Entry = Entry->Blink;
}
CxPlatListInsertHead(Entry, &Stream->SendLink); // Insert after current Entry
QuicStreamAddRef(Stream, QUIC_STREAM_REF_SEND);
}

Expand All @@ -172,6 +184,31 @@ QuicSendQueueFlushForStream(
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicSendUpdateStreamPriority(
_In_ QUIC_SEND* Send,
_In_ QUIC_STREAM* Stream
)
{
CXPLAT_DBG_ASSERT(Stream->SendLink.Flink != NULL);
CxPlatListEntryRemove(&Stream->SendLink);

CXPLAT_LIST_ENTRY* Entry = Send->SendStreams.Blink;
while (Entry != &Send->SendStreams) {
//
// Search back to front for the right place (based on priority) to
// insert the stream.
//
if (Stream->SendPriority <=
CXPLAT_CONTAINING_RECORD(Entry, QUIC_STREAM, SendLink)->SendPriority) {
break;
}
Entry = Entry->Blink;
}
CxPlatListInsertHead(Entry, &Stream->SendLink); // Insert after current Entry
}

#if DEBUG
_IRQL_requires_max_(DISPATCH_LEVEL)
void
Expand Down Expand Up @@ -888,10 +925,23 @@ QuicSendGetNextStream(

if (Connection->State.UseRoundRobinStreamScheduling) {
//
// Move the stream to the end of the queue.
// Move the stream after any streams of the same priority. Start
// with the "next" entry in the list and keep going until the
// next entry's priority is less. Then move the stream before
// that entry.
//
CxPlatListEntryRemove(&Stream->SendLink);
CxPlatListInsertTail(&Send->SendStreams, &Stream->SendLink);
CXPLAT_LIST_ENTRY* LastEntry = Stream->SendLink.Flink;
while (Stream->SendLink.Flink != &Send->SendStreams) {
if (Stream->SendPriority >
CXPLAT_CONTAINING_RECORD(LastEntry, QUIC_STREAM, SendLink)->SendPriority) {
break;
}
LastEntry = LastEntry->Flink;
}
if (LastEntry->Blink != &Stream->SendLink) {
CxPlatListEntryRemove(&Stream->SendLink);
CxPlatListInsertTail(LastEntry, &Stream->SendLink);
}

*PacketCount = QUIC_STREAM_SEND_BATCH_COUNT;

Expand Down
10 changes: 10 additions & 0 deletions src/core/send.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ QuicSendQueueFlushForStream(
_In_ BOOLEAN DelaySend
);

//
// Updates the stream's order in response to a priority change.
//
_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicSendUpdateStreamPriority(
_In_ QUIC_SEND* Send,
_In_ QUIC_STREAM* Stream
);

//
// Tries to drain all queued data that needs to be sent. Returns TRUE if all the
// data was drained.
Expand Down
71 changes: 64 additions & 7 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ QuicStreamInitialize(
Stream->RecvMaxLength = UINT64_MAX;
Stream->RefCount = 1;
Stream->SendRequestsTail = &Stream->SendRequests;
Stream->SendPriority = QUIC_STREAM_PRIORITY_DEFAULT;
CxPlatDispatchLockInitialize(&Stream->ApiSendRequestLock);
CxPlatRefInitialize(&Stream->RefCount);
QuicRangeInitialize(
Expand Down Expand Up @@ -538,11 +539,49 @@ QuicStreamParamSet(
const void* Buffer
)
{
UNREFERENCED_PARAMETER(Stream);
UNREFERENCED_PARAMETER(Param);
UNREFERENCED_PARAMETER(BufferLength);
UNREFERENCED_PARAMETER(Buffer);
return QUIC_STATUS_INVALID_PARAMETER;
QUIC_STATUS Status;

switch (Param) {
case QUIC_PARAM_STREAM_ID:
case QUIC_PARAM_STREAM_0RTT_LENGTH:
case QUIC_PARAM_STREAM_IDEAL_SEND_BUFFER_SIZE:
Status = QUIC_STATUS_INVALID_PARAMETER;
break;

case QUIC_PARAM_STREAM_PRIORITY: {

if (BufferLength != sizeof(Stream->SendPriority)) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}

if (Stream->SendPriority != *(uint16_t*)Buffer) {
Stream->SendPriority = *(uint16_t*)Buffer;

QuicTraceLogStreamInfo(
UpdatePriority,
Stream,
"New send priority = %hu",
Stream->SendPriority);

if (Stream->Flags.Started && Stream->SendFlags != 0) {
//
// Update the stream's place in the send queue if necessary.
//
QuicSendUpdateStreamPriority(&Stream->Connection->Send, Stream);
}
}

Status = QUIC_STATUS_SUCCESS;
break;
}

default:
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}

return Status;
}

QUIC_STATUS
Expand All @@ -556,8 +595,7 @@ QuicStreamParamGet(
{
QUIC_STATUS Status;

switch (Param)
{
switch (Param) {
case QUIC_PARAM_STREAM_ID:

if (*BufferLength < sizeof(Stream->ID)) {
Expand Down Expand Up @@ -627,6 +665,25 @@ QuicStreamParamGet(
Status = QUIC_STATUS_SUCCESS;
break;

case QUIC_PARAM_STREAM_PRIORITY:

if (*BufferLength < sizeof(Stream->SendPriority)) {
*BufferLength = sizeof(Stream->SendPriority);
Status = QUIC_STATUS_BUFFER_TOO_SMALL;
break;
}

if (Buffer == NULL) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}

*BufferLength = sizeof(Stream->SendPriority);
*(uint16_t*)Buffer = Stream->SendPriority;

Status = QUIC_STATUS_SUCCESS;
break;

default:
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
Expand Down
8 changes: 8 additions & 0 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ typedef struct QUIC_CONNECTION QUIC_CONNECTION;
QUIC_SEND_FLAG_BUFFERED \
)

#define QUIC_STREAM_PRIORITY_DEFAULT 0x7FFF // Medium priority by default

//
// Tracks the data queued up for sending by an application.
//
Expand Down Expand Up @@ -342,6 +344,12 @@ typedef struct QUIC_STREAM {
//
QUIC_RANGE SparseAckRanges;

//
// The relative priority between the different streams that determines the
// order that queued data will be sent out.
//
uint16_t SendPriority;

//
// Recv State
//
Expand Down
1 change: 1 addition & 0 deletions src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ typedef struct QUIC_SCHANNEL_CONTEXT_ATTRIBUTE_W {
#define QUIC_PARAM_STREAM_ID 0x1C000000 // QUIC_UINT62
#define QUIC_PARAM_STREAM_0RTT_LENGTH 0x1C000001 // uint64_t
#define QUIC_PARAM_STREAM_IDEAL_SEND_BUFFER_SIZE 0x1C000002 // uint64_t - bytes
#define QUIC_PARAM_STREAM_PRIORITY 0x1C000003 // uint16_t - 0 (low) to 0xFFFF (high) - 0x7FFF (default)

typedef
_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down
41 changes: 41 additions & 0 deletions src/inc/msquic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,47 @@ struct MsQuicStream {
return MsQuic->StreamReceiveSetEnabled(Handle, IsEnabled ? TRUE : FALSE);
}

QUIC_STATUS
GetID(_Out_ QUIC_UINT62* ID) const noexcept {
uint32_t Size = sizeof(*ID);
return
MsQuic->GetParam(
Handle,
QUIC_PARAM_LEVEL_STREAM,
QUIC_PARAM_STREAM_ID,
&Size,
ID);
}

QUIC_UINT62 ID() const noexcept {
QUIC_UINT62 ID;
GetID(&ID);
return ID;
}

QUIC_STATUS
SetPriority(_In_ uint16_t Priority) noexcept {
return
MsQuic->SetParam(
Handle,
QUIC_PARAM_LEVEL_STREAM,
QUIC_PARAM_STREAM_PRIORITY,
sizeof(Priority),
&Priority);
}

QUIC_STATUS
GetPriority(_Out_ uint16_t* Priority) const noexcept {
uint32_t Size = sizeof(*Priority);
return
MsQuic->GetParam(
Handle,
QUIC_PARAM_LEVEL_STREAM,
QUIC_PARAM_STREAM_PRIORITY,
&Size,
Priority);
}

QUIC_STATUS GetInitStatus() const noexcept { return InitStatus; }
bool IsValid() const { return QUIC_SUCCEEDED(InitStatus); }
MsQuicStream(MsQuicStream& other) = delete;
Expand Down
20 changes: 20 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -6720,6 +6720,22 @@
],
"macroName": "QuicTraceLogStreamWarning"
},
"UpdatePriority": {
"ModuleProperites": {},
"TraceString": "[strm][%p] New send priority = %hu",
"UniqueId": "UpdatePriority",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg1"
},
{
"DefinationEncoding": "hu",
"MacroVariableName": "arg3"
}
],
"macroName": "QuicTraceLogStreamInfo"
},
"IndicateStartComplete": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Indicating QUIC_STREAM_EVENT_START_COMPLETE [Status=0x%x ID=%llu Accepted=%hhu]",
Expand Down Expand Up @@ -12467,6 +12483,10 @@
"UniquenessHash": "21a2e517-42a5-1d18-885f-8b57b79a2fba",
"TraceID": "EventSilentDiscard"
},
{
"UniquenessHash": "1b92f995-0f00-00a4-3898-3e5121f9f323",
"TraceID": "UpdatePriority"
},
{
"UniquenessHash": "66dd140b-9fd3-eddc-e34c-cb81639e1aca",
"TraceID": "IndicateStartComplete"
Expand Down
10 changes: 9 additions & 1 deletion src/test/MsQuicTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ void
QuicTestNthAllocFail(
);

void
QuicTestStreamPriority(
);

//
// QuicDrill tests
//
Expand Down Expand Up @@ -867,5 +871,9 @@ typedef struct {

#define IOCTL_QUIC_RUN_VALIDATE_PARAM_API \
QUIC_CTL_CODE(71, METHOD_BUFFERED, FILE_WRITE_DATA)
// int - Family

#define IOCTL_QUIC_RUN_STREAM_PRIORITY \
QUIC_CTL_CODE(72, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 71
#define QUIC_MAX_IOCTL_FUNC_CODE 72
9 changes: 9 additions & 0 deletions src/test/bin/quic_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,15 @@ TEST(Misc, NthAllocFail) {
}
#endif

TEST(Misc, StreamPriority) {
TestLogger Logger("StreamPriority");
if (TestingKernelMode) {
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_STREAM_PRIORITY));
} else {
QuicTestStreamPriority();
}
}

TEST(Drill, VarIntEncoder) {
TestLogger Logger("QuicDrillTestVarIntEncoder");
if (TestingKernelMode) {
Expand Down
Loading