Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Event Callback Reentrancy #1802

Merged
merged 2 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 71 additions & 47 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,14 @@ MsQuicConnectionClose(
//
// Execute this blocking API call inline if called on the worker thread.
//
BOOLEAN AlreadyInline = Connection->State.InlineApiExecution;
if (!AlreadyInline) {
Connection->State.InlineApiExecution = TRUE;
}
QuicConnCloseHandle(Connection);
if (!AlreadyInline) {
Connection->State.InlineApiExecution = FALSE;
}

} else {

Expand Down Expand Up @@ -692,7 +699,14 @@ MsQuicStreamClose(
//
// Execute this blocking API call inline if called on the worker thread.
//
BOOLEAN AlreadyInline = Connection->State.InlineApiExecution;
if (!AlreadyInline) {
Connection->State.InlineApiExecution = TRUE;
}
QuicStreamClose(Stream);
if (!AlreadyInline) {
Connection->State.InlineApiExecution = FALSE;
}

} else {

Expand Down Expand Up @@ -785,16 +799,7 @@ MsQuicStreamStart(
goto Exit;
}

if (Connection->WorkerThreadID == CxPlatCurThreadID()) {

CXPLAT_PASSIVE_CODE();

//
// Execute this blocking API call inline if called on the worker thread.
//
Status = QuicStreamStart(Stream, Flags, FALSE);

} else if (Flags & QUIC_STREAM_START_FLAG_ASYNC) {
if (Flags & QUIC_STREAM_START_FLAG_ASYNC) {

QUIC_OPERATION* Oper =
QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL);
Expand Down Expand Up @@ -824,6 +829,22 @@ MsQuicStreamStart(
QuicConnQueueOper(Connection, Oper);
Status = QUIC_STATUS_PENDING;

} else if (Connection->WorkerThreadID == CxPlatCurThreadID()) {

CXPLAT_PASSIVE_CODE();

//
// Execute this blocking API call inline if called on the worker thread.
//
BOOLEAN AlreadyInline = Connection->State.InlineApiExecution;
if (!AlreadyInline) {
Connection->State.InlineApiExecution = TRUE;
}
Status = QuicStreamStart(Stream, Flags, FALSE);
if (!AlreadyInline) {
Connection->State.InlineApiExecution = FALSE;
}

} else {

CXPLAT_PASSIVE_CODE();
Expand Down Expand Up @@ -926,46 +947,35 @@ MsQuicStreamShutdown(
Connection = Stream->Connection;

QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);
QUIC_CONN_VERIFY(Connection, !Connection->State.HandleClosed);

if (Connection->WorkerThreadID == CxPlatCurThreadID()) {
//
// Execute this blocking API call inline if called on the worker thread.
//
QuicStreamShutdown(Stream, Flags, ErrorCode);
Status = QUIC_STATUS_SUCCESS;

} else {

QUIC_CONN_VERIFY(Connection, !Connection->State.HandleClosed);

Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL);
if (Oper == NULL) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"STRM_SHUTDOWN operation",
0);
goto Error;
}
Oper->API_CALL.Context->Type = QUIC_API_TYPE_STRM_SHUTDOWN;
Oper->API_CALL.Context->STRM_SHUTDOWN.Stream = Stream;
Oper->API_CALL.Context->STRM_SHUTDOWN.Flags = Flags;
Oper->API_CALL.Context->STRM_SHUTDOWN.ErrorCode = ErrorCode;
Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL);
if (Oper == NULL) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"STRM_SHUTDOWN operation",
0);
goto Error;
}
Oper->API_CALL.Context->Type = QUIC_API_TYPE_STRM_SHUTDOWN;
Oper->API_CALL.Context->STRM_SHUTDOWN.Stream = Stream;
Oper->API_CALL.Context->STRM_SHUTDOWN.Flags = Flags;
Oper->API_CALL.Context->STRM_SHUTDOWN.ErrorCode = ErrorCode;

//
// Async stream operations need to hold a ref on the stream so that the
// stream isn't freed before the operation can be processed. The ref is
// released after the operation is processed.
//
QuicStreamAddRef(Stream, QUIC_STREAM_REF_OPERATION);
//
// Async stream operations need to hold a ref on the stream so that the
// stream isn't freed before the operation can be processed. The ref is
// released after the operation is processed.
//
QuicStreamAddRef(Stream, QUIC_STREAM_REF_OPERATION);

//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
Status = QUIC_STATUS_PENDING;
}
//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
Status = QUIC_STATUS_PENDING;

Error:

Expand Down Expand Up @@ -1356,7 +1366,14 @@ MsQuicSetParam(
//
// Execute this blocking API call inline if called on the worker thread.
//
BOOLEAN AlreadyInline = Connection->State.InlineApiExecution;
if (!AlreadyInline) {
Connection->State.InlineApiExecution = TRUE;
}
Status = QuicLibrarySetParam(Handle, Level, Param, BufferLength, Buffer);
if (!AlreadyInline) {
Connection->State.InlineApiExecution = FALSE;
}
goto Error;
}

Expand Down Expand Up @@ -1486,7 +1503,14 @@ MsQuicGetParam(
//
// Execute this blocking API call inline if called on the worker thread.
//
BOOLEAN AlreadyInline = Connection->State.InlineApiExecution;
if (!AlreadyInline) {
Connection->State.InlineApiExecution = TRUE;
}
Status = QuicLibraryGetParam(Handle, Level, Param, BufferLength, Buffer);
if (!AlreadyInline) {
Connection->State.InlineApiExecution = FALSE;
}
goto Error;
}

Expand Down
44 changes: 24 additions & 20 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ QuicConnCloseHandle(
)
{
CXPLAT_TEL_ASSERT(!Connection->State.HandleClosed);
Connection->State.HandleClosed = TRUE;

QuicConnCloseLocally(
Connection,
Expand All @@ -498,7 +499,6 @@ QuicConnCloseHandle(
QuicConnOnShutdownComplete(Connection);
}

Connection->State.HandleClosed = TRUE;
Connection->ClientCallbackHandler = NULL;

if (Connection->State.Registered) {
Expand Down Expand Up @@ -683,27 +683,32 @@ QuicConnIndicateEvent(
)
{
QUIC_STATUS Status;
if (!Connection->State.HandleClosed) {
QUIC_CONN_VERIFY(Connection, Connection->State.HandleShutdown || Connection->ClientCallbackHandler != NULL || !Connection->State.ExternalOwner);
if (Connection->ClientCallbackHandler == NULL) {
Status = QUIC_STATUS_INVALID_STATE;
QuicTraceLogConnWarning(
ApiEventNoHandler,
Connection,
"Event silently discarded (no handler).");
} else {
Status =
Connection->ClientCallbackHandler(
(HQUIC)Connection,
Connection->ClientContext,
Event);
}
if (Connection->ClientCallbackHandler != NULL) {
//
// MsQuic shouldn't indicate reentrancy to the app when at all possible.
// The general exception to this rule is when the connection is being
// closed because the API MUST block until all work is completed, so we
// have to execute the event callbacks inline.
//
CXPLAT_DBG_ASSERT(
!Connection->State.InlineApiExecution ||
Connection->State.HandleClosed);
Status =
Connection->ClientCallbackHandler(
(HQUIC)Connection,
Connection->ClientContext,
Event);
} else {
QUIC_CONN_VERIFY(
Connection,
Connection->State.HandleClosed ||
Connection->State.HandleShutdown ||
!Connection->State.ExternalOwner);
Status = QUIC_STATUS_INVALID_STATE;
QuicTraceLogConnWarning(
ApiEventAlreadyClosed,
ApiEventNoHandler,
Connection,
"Event silently discarded.");
"Event silently discarded (no handler).");
}
return Status;
}
Expand Down Expand Up @@ -1402,7 +1407,7 @@ QuicConnOnShutdownComplete(
Event.SHUTDOWN_COMPLETE.PeerAcknowledgedShutdown =
!Connection->State.ShutdownCompleteTimedOut;
Event.SHUTDOWN_COMPLETE.AppCloseInProgress =
Connection->State.AppCloseInProgress;
Connection->State.HandleClosed;
nibanks marked this conversation as resolved.
Show resolved Hide resolved

QuicTraceLogConnVerbose(
IndicateConnectionShutdownComplete,
Expand Down Expand Up @@ -6554,7 +6559,6 @@ QuicConnProcessApiOperation(
switch (ApiCtx->Type) {

case QUIC_API_TYPE_CONN_CLOSE:
Connection->State.AppCloseInProgress = TRUE;
QuicConnCloseHandle(Connection);
break;

Expand Down
14 changes: 8 additions & 6 deletions src/core/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,18 @@ typedef union QUIC_CONNECTION_STATE {
//
BOOLEAN ResumptionEnabled : 1;

//
// Indicates that an app close from a non worker thread is in progress.
// Received by the QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE event.
//
BOOLEAN AppCloseInProgress: 1;

//
// When true, this indicates that reordering shouldn't elict an
// immediate acknowledgement.
//
BOOLEAN IgnoreReordering : 1;

//
// When true, this indicates that the connection is currently executing
// and API call inline (from a reentrant call on a callback).
nibanks marked this conversation as resolved.
Show resolved Hide resolved
//
BOOLEAN InlineApiExecution : 1;

#ifdef CxPlatVerifierEnabledByAddr
//
// The calling app is being verified (app or driver verifier).
Expand All @@ -168,6 +168,8 @@ typedef union QUIC_CONNECTION_STATE {
};
} QUIC_CONNECTION_STATE;

CXPLAT_STATIC_ASSERT(sizeof(QUIC_CONNECTION_STATE) == sizeof(uint32_t), "Ensure correct size/type");

//
// Different references on a connection.
//
Expand Down
17 changes: 16 additions & 1 deletion src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ QuicStreamClose(
_In_ __drv_freesMem(Mem) QUIC_STREAM* Stream
)
{
CXPLAT_DBG_ASSERT(!Stream->Flags.HandleClosed);
Stream->Flags.HandleClosed = TRUE;

if (!Stream->Flags.ShutdownComplete) {

if (Stream->Flags.Started) {
Expand Down Expand Up @@ -352,7 +355,6 @@ QuicStreamClose(
}
}

Stream->Flags.HandleClosed = TRUE;
Stream->ClientCallbackHandler = NULL;

QuicStreamRelease(Stream, QUIC_STREAM_REF_APP);
Expand Down Expand Up @@ -388,6 +390,19 @@ QuicStreamIndicateEvent(
{
QUIC_STATUS Status;
if (Stream->ClientCallbackHandler != NULL) {
//
// MsQuic shouldn't indicate reentrancy to the app when at all
// possible. The general exception to this rule is when the connection
// or stream is being closed because the API MUST block until all work
// is completed, so we have to execute the event callbacks inline. There
// is also one additional exception for start complete when StreamStart
// is called synchronously on an MsQuic thread.
//
CXPLAT_DBG_ASSERT(
!Stream->Connection->State.InlineApiExecution ||
Stream->Connection->State.HandleClosed ||
Stream->Flags.HandleClosed ||
Event->Type == QUIC_STREAM_EVENT_START_COMPLETE);
Status =
Stream->ClientCallbackHandler(
(HQUIC)Stream,
Expand Down
12 changes: 9 additions & 3 deletions src/plugins/dbg/quictypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,16 @@ typedef union QUIC_CONNECTION_STATE {
BOOLEAN ResumptionEnabled : 1;

//
// Indicates that an app close from a non worker thread is in progress.
// Received by the QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE event.
// When true, this indicates that reordering shouldn't elict an
// immediate acknowledgement.
//
BOOLEAN AppCloseInProgress: 1;
BOOLEAN IgnoreReordering : 1;

//
// When true, this indicates that the connection is currently executing
// and API call inline (from a reentrant call on a callback).
//
BOOLEAN InlineApiExecution : 1;

#ifdef CxPlatVerifierEnabledByAddr
//
Expand Down