Skip to content

Commit

Permalink
Stream Priority Support (#1778)
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks authored Jul 6, 2021
1 parent 3874fab commit 5677bb6
Show file tree
Hide file tree
Showing 14 changed files with 365 additions and 37 deletions.
14 changes: 2 additions & 12 deletions scripts/update-sidecar.ps1
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
<#
.SYNOPSIS
This updates/regenerates the CLOG sidecar file.
.PARAMETER Clean
Deletes the old sidecar file first.
This regenerates the CLOG sidecar file.
#>

param (
[Parameter(Mandatory = $false)]
[switch]$Clean = $false
)

Set-StrictMode -Version 'Latest'
$PSDefaultParameterValues['*:ErrorAction'] = 'Stop'

Expand All @@ -35,9 +27,7 @@ $ConfigFile = Join-Path $SrcDir "manifest" "msquic.clog_config"
$OutputDir = Join-Path $RootDir "build" "tmp"
New-Item -Path $OutputDir -ItemType Directory -Force | Out-Null

if ($Clean) {
Remove-Item $Sidecar -Force -ErrorAction Ignore | Out-Null
}
Remove-Item $Sidecar -Force -ErrorAction Ignore | Out-Null

foreach ($File in $Files) {
clog -p windows --scopePrefix "QUIC" -s $Sidecar -c $ConfigFile -i $File --outputDirectory "$OutputDir"
Expand Down
1 change: 0 additions & 1 deletion src/core/crypto.c
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,6 @@ QuicCryptoCustomCertValidationComplete(
_In_ BOOLEAN Result
)
{
CXPLAT_TEL_ASSERT(Crypto->CertValidationPending);
if (!Crypto->CertValidationPending) {
return;
}
Expand Down
58 changes: 54 additions & 4 deletions src/core/send.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,19 @@ QuicSendQueueFlushForStream(
// Not previously queued, so add the stream to the end of the queue.
//
CXPLAT_DBG_ASSERT(Stream->SendLink.Flink == NULL);
CxPlatListInsertTail(&Send->SendStreams, &Stream->SendLink);
CXPLAT_LIST_ENTRY* Entry = Send->SendStreams.Blink;
while (Entry != &Send->SendStreams) {
//
// Search back to front for the right place (based on priority) to
// insert the stream.
//
if (Stream->SendPriority <=
CXPLAT_CONTAINING_RECORD(Entry, QUIC_STREAM, SendLink)->SendPriority) {
break;
}
Entry = Entry->Blink;
}
CxPlatListInsertHead(Entry, &Stream->SendLink); // Insert after current Entry
QuicStreamAddRef(Stream, QUIC_STREAM_REF_SEND);
}

Expand All @@ -172,6 +184,31 @@ QuicSendQueueFlushForStream(
}
}

_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicSendUpdateStreamPriority(
_In_ QUIC_SEND* Send,
_In_ QUIC_STREAM* Stream
)
{
CXPLAT_DBG_ASSERT(Stream->SendLink.Flink != NULL);
CxPlatListEntryRemove(&Stream->SendLink);

CXPLAT_LIST_ENTRY* Entry = Send->SendStreams.Blink;
while (Entry != &Send->SendStreams) {
//
// Search back to front for the right place (based on priority) to
// insert the stream.
//
if (Stream->SendPriority <=
CXPLAT_CONTAINING_RECORD(Entry, QUIC_STREAM, SendLink)->SendPriority) {
break;
}
Entry = Entry->Blink;
}
CxPlatListInsertHead(Entry, &Stream->SendLink); // Insert after current Entry
}

#if DEBUG
_IRQL_requires_max_(DISPATCH_LEVEL)
void
Expand Down Expand Up @@ -888,10 +925,23 @@ QuicSendGetNextStream(

if (Connection->State.UseRoundRobinStreamScheduling) {
//
// Move the stream to the end of the queue.
// Move the stream after any streams of the same priority. Start
// with the "next" entry in the list and keep going until the
// next entry's priority is less. Then move the stream before
// that entry.
//
CxPlatListEntryRemove(&Stream->SendLink);
CxPlatListInsertTail(&Send->SendStreams, &Stream->SendLink);
CXPLAT_LIST_ENTRY* LastEntry = Stream->SendLink.Flink;
while (Stream->SendLink.Flink != &Send->SendStreams) {
if (Stream->SendPriority >
CXPLAT_CONTAINING_RECORD(LastEntry, QUIC_STREAM, SendLink)->SendPriority) {
break;
}
LastEntry = LastEntry->Flink;
}
if (LastEntry->Blink != &Stream->SendLink) {
CxPlatListEntryRemove(&Stream->SendLink);
CxPlatListInsertTail(LastEntry, &Stream->SendLink);
}

*PacketCount = QUIC_STREAM_SEND_BATCH_COUNT;

Expand Down
10 changes: 10 additions & 0 deletions src/core/send.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ QuicSendQueueFlushForStream(
_In_ BOOLEAN DelaySend
);

//
// Updates the stream's order in response to a priority change.
//
_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicSendUpdateStreamPriority(
_In_ QUIC_SEND* Send,
_In_ QUIC_STREAM* Stream
);

//
// Tries to drain all queued data that needs to be sent. Returns TRUE if all the
// data was drained.
Expand Down
71 changes: 64 additions & 7 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ QuicStreamInitialize(
Stream->RecvMaxLength = UINT64_MAX;
Stream->RefCount = 1;
Stream->SendRequestsTail = &Stream->SendRequests;
Stream->SendPriority = QUIC_STREAM_PRIORITY_DEFAULT;
CxPlatDispatchLockInitialize(&Stream->ApiSendRequestLock);
CxPlatRefInitialize(&Stream->RefCount);
QuicRangeInitialize(
Expand Down Expand Up @@ -538,11 +539,49 @@ QuicStreamParamSet(
const void* Buffer
)
{
UNREFERENCED_PARAMETER(Stream);
UNREFERENCED_PARAMETER(Param);
UNREFERENCED_PARAMETER(BufferLength);
UNREFERENCED_PARAMETER(Buffer);
return QUIC_STATUS_INVALID_PARAMETER;
QUIC_STATUS Status;

switch (Param) {
case QUIC_PARAM_STREAM_ID:
case QUIC_PARAM_STREAM_0RTT_LENGTH:
case QUIC_PARAM_STREAM_IDEAL_SEND_BUFFER_SIZE:
Status = QUIC_STATUS_INVALID_PARAMETER;
break;

case QUIC_PARAM_STREAM_PRIORITY: {

if (BufferLength != sizeof(Stream->SendPriority)) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}

if (Stream->SendPriority != *(uint16_t*)Buffer) {
Stream->SendPriority = *(uint16_t*)Buffer;

QuicTraceLogStreamInfo(
UpdatePriority,
Stream,
"New send priority = %hu",
Stream->SendPriority);

if (Stream->Flags.Started && Stream->SendFlags != 0) {
//
// Update the stream's place in the send queue if necessary.
//
QuicSendUpdateStreamPriority(&Stream->Connection->Send, Stream);
}
}

Status = QUIC_STATUS_SUCCESS;
break;
}

default:
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}

return Status;
}

QUIC_STATUS
Expand All @@ -556,8 +595,7 @@ QuicStreamParamGet(
{
QUIC_STATUS Status;

switch (Param)
{
switch (Param) {
case QUIC_PARAM_STREAM_ID:

if (*BufferLength < sizeof(Stream->ID)) {
Expand Down Expand Up @@ -627,6 +665,25 @@ QuicStreamParamGet(
Status = QUIC_STATUS_SUCCESS;
break;

case QUIC_PARAM_STREAM_PRIORITY:

if (*BufferLength < sizeof(Stream->SendPriority)) {
*BufferLength = sizeof(Stream->SendPriority);
Status = QUIC_STATUS_BUFFER_TOO_SMALL;
break;
}

if (Buffer == NULL) {
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
}

*BufferLength = sizeof(Stream->SendPriority);
*(uint16_t*)Buffer = Stream->SendPriority;

Status = QUIC_STATUS_SUCCESS;
break;

default:
Status = QUIC_STATUS_INVALID_PARAMETER;
break;
Expand Down
8 changes: 8 additions & 0 deletions src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ typedef struct QUIC_CONNECTION QUIC_CONNECTION;
QUIC_SEND_FLAG_BUFFERED \
)

#define QUIC_STREAM_PRIORITY_DEFAULT 0x7FFF // Medium priority by default

//
// Tracks the data queued up for sending by an application.
//
Expand Down Expand Up @@ -342,6 +344,12 @@ typedef struct QUIC_STREAM {
//
QUIC_RANGE SparseAckRanges;

//
// The relative priority between the different streams that determines the
// order that queued data will be sent out.
//
uint16_t SendPriority;

//
// Recv State
//
Expand Down
1 change: 1 addition & 0 deletions src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ typedef struct QUIC_SCHANNEL_CONTEXT_ATTRIBUTE_W {
#define QUIC_PARAM_STREAM_ID 0x1C000000 // QUIC_UINT62
#define QUIC_PARAM_STREAM_0RTT_LENGTH 0x1C000001 // uint64_t
#define QUIC_PARAM_STREAM_IDEAL_SEND_BUFFER_SIZE 0x1C000002 // uint64_t - bytes
#define QUIC_PARAM_STREAM_PRIORITY 0x1C000003 // uint16_t - 0 (low) to 0xFFFF (high) - 0x7FFF (default)

typedef
_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down
41 changes: 41 additions & 0 deletions src/inc/msquic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,47 @@ struct MsQuicStream {
return MsQuic->StreamReceiveSetEnabled(Handle, IsEnabled ? TRUE : FALSE);
}

QUIC_STATUS
GetID(_Out_ QUIC_UINT62* ID) const noexcept {
uint32_t Size = sizeof(*ID);
return
MsQuic->GetParam(
Handle,
QUIC_PARAM_LEVEL_STREAM,
QUIC_PARAM_STREAM_ID,
&Size,
ID);
}

QUIC_UINT62 ID() const noexcept {
QUIC_UINT62 ID;
GetID(&ID);
return ID;
}

QUIC_STATUS
SetPriority(_In_ uint16_t Priority) noexcept {
return
MsQuic->SetParam(
Handle,
QUIC_PARAM_LEVEL_STREAM,
QUIC_PARAM_STREAM_PRIORITY,
sizeof(Priority),
&Priority);
}

QUIC_STATUS
GetPriority(_Out_ uint16_t* Priority) const noexcept {
uint32_t Size = sizeof(*Priority);
return
MsQuic->GetParam(
Handle,
QUIC_PARAM_LEVEL_STREAM,
QUIC_PARAM_STREAM_PRIORITY,
&Size,
Priority);
}

QUIC_STATUS GetInitStatus() const noexcept { return InitStatus; }
bool IsValid() const { return QUIC_SUCCEEDED(InitStatus); }
MsQuicStream(MsQuicStream& other) = delete;
Expand Down
20 changes: 20 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -6720,6 +6720,22 @@
],
"macroName": "QuicTraceLogStreamWarning"
},
"UpdatePriority": {
"ModuleProperites": {},
"TraceString": "[strm][%p] New send priority = %hu",
"UniqueId": "UpdatePriority",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg1"
},
{
"DefinationEncoding": "hu",
"MacroVariableName": "arg3"
}
],
"macroName": "QuicTraceLogStreamInfo"
},
"IndicateStartComplete": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Indicating QUIC_STREAM_EVENT_START_COMPLETE [Status=0x%x ID=%llu Accepted=%hhu]",
Expand Down Expand Up @@ -12467,6 +12483,10 @@
"UniquenessHash": "21a2e517-42a5-1d18-885f-8b57b79a2fba",
"TraceID": "EventSilentDiscard"
},
{
"UniquenessHash": "1b92f995-0f00-00a4-3898-3e5121f9f323",
"TraceID": "UpdatePriority"
},
{
"UniquenessHash": "66dd140b-9fd3-eddc-e34c-cb81639e1aca",
"TraceID": "IndicateStartComplete"
Expand Down
10 changes: 9 additions & 1 deletion src/test/MsQuicTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ void
QuicTestNthAllocFail(
);

void
QuicTestStreamPriority(
);

//
// QuicDrill tests
//
Expand Down Expand Up @@ -867,5 +871,9 @@ typedef struct {

#define IOCTL_QUIC_RUN_VALIDATE_PARAM_API \
QUIC_CTL_CODE(71, METHOD_BUFFERED, FILE_WRITE_DATA)
// int - Family

#define IOCTL_QUIC_RUN_STREAM_PRIORITY \
QUIC_CTL_CODE(72, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 71
#define QUIC_MAX_IOCTL_FUNC_CODE 72
9 changes: 9 additions & 0 deletions src/test/bin/quic_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,15 @@ TEST(Misc, NthAllocFail) {
}
#endif

TEST(Misc, StreamPriority) {
TestLogger Logger("StreamPriority");
if (TestingKernelMode) {
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_STREAM_PRIORITY));
} else {
QuicTestStreamPriority();
}
}

TEST(Drill, VarIntEncoder) {
TestLogger Logger("QuicDrillTestVarIntEncoder");
if (TestingKernelMode) {
Expand Down
Loading

0 comments on commit 5677bb6

Please sign in to comment.