From 7f172f1cd0767348856b6f080b64c5b7f660db3b Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Thu, 30 Nov 2023 14:49:42 +0100 Subject: [PATCH 1/4] stop passing null on MT --- .../BrowserWebSockets/BrowserWebSocket.cs | 47 ++++++++++++------- .../tests/SendReceiveTest.cs | 6 ++- src/mono/wasm/runtime/web-socket.ts | 36 +++++++++++--- 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs index 5abe81f919acf..618ea16c0886f 100644 --- a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs +++ b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs @@ -476,13 +476,17 @@ private async Task SendAsyncCore(ArraySegment buffer, WebSocketMessageType try { var sendTask = BrowserInterop.UnsafeSendSync(_innerWebSocket!, buffer, messageType, endOfMessage); - if (sendTask == null) + if (sendTask != null) { - // return synchronously - return; + await CancelationHelper(sendTask, cancellationToken, FastState).ConfigureAwait(true); } - - await CancelationHelper(sendTask, cancellationToken, FastState).ConfigureAwait(true); +#if FEATURE_WASM_THREADS + // return synchronously, not supported with MT + else + { + Environment.FailFast("Unexpected synchronous result"); + } +#endif } catch (JSException ex) { @@ -502,19 +506,18 @@ private async Task ReceiveAsyncCore(ArraySegment b using (MemoryHandle pinBuffer = bufferMemory.Pin()) { var receiveTask = BrowserInterop.ReceiveUnsafeSync(_innerWebSocket!, pinBuffer, bufferMemory.Length); - if (receiveTask == null) + if (receiveTask != null) { - // return synchronously -#if FEATURE_WASM_THREADS - lock (_thisLock) - { -#endif - return ConvertResponse(this); + await CancelationHelper(receiveTask, cancellationToken, FastState).ConfigureAwait(true); + } #if FEATURE_WASM_THREADS - } //lock -#endif + // return synchronously, not supported with MT + else + { + Environment.FailFast("Unexpected synchronous result"); } - await CancelationHelper(receiveTask, cancellationToken, FastState).ConfigureAwait(true); +#endif + #if FEATURE_WASM_THREADS lock (_thisLock) @@ -555,8 +558,18 @@ private async Task CloseAsyncCore(WebSocketCloseStatus closeStatus, string? stat _closeStatus = closeStatus; _closeStatusDescription = statusDescription; - var closeTask = BrowserInterop.WebSocketClose(_innerWebSocket!, (int)closeStatus, statusDescription, waitForCloseReceived) ?? Task.CompletedTask; - await CancelationHelper(closeTask, cancellationToken, FastState).ConfigureAwait(true); + var closeTask = BrowserInterop.WebSocketClose(_innerWebSocket!, (int)closeStatus, statusDescription, waitForCloseReceived); + if (closeTask != null) + { + await CancelationHelper(closeTask, cancellationToken, FastState).ConfigureAwait(true); + } +#if FEATURE_WASM_THREADS + // return synchronously, not supported with MT + else + { + Environment.FailFast("Unexpected synchronous result"); + } +#endif #if FEATURE_WASM_THREADS lock (_thisLock) diff --git a/src/libraries/System.Net.WebSockets.Client/tests/SendReceiveTest.cs b/src/libraries/System.Net.WebSockets.Client/tests/SendReceiveTest.cs index 9667eb1733b81..2016f4f7285a8 100644 --- a/src/libraries/System.Net.WebSockets.Client/tests/SendReceiveTest.cs +++ b/src/libraries/System.Net.WebSockets.Client/tests/SendReceiveTest.cs @@ -514,7 +514,11 @@ public async Task ZeroByteReceive_CompletesWhenDataAvailable(Uri server) // Now do a receive to get the payload. var receiveBuffer = new byte[1]; t = ReceiveAsync(cws, new ArraySegment(receiveBuffer), ctsDefault.Token); - Assert.Equal(TaskStatus.RanToCompletion, t.Status); + // this is not synchronously possible when the WS client is on another WebWorker + if(!PlatformDetection.IsWasmThreadingSupported) + { + Assert.Equal(TaskStatus.RanToCompletion, t.Status); + } r = await t; Assert.Equal(WebSocketMessageType.Binary, r.MessageType); diff --git a/src/mono/wasm/runtime/web-socket.ts b/src/mono/wasm/runtime/web-socket.ts index 9b453c19c69d2..cfa50cf739cfd 100644 --- a/src/mono/wasm/runtime/web-socket.ts +++ b/src/mono/wasm/runtime/web-socket.ts @@ -130,7 +130,12 @@ export function ws_wasm_send(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer const whole_buffer = _mono_wasm_web_socket_send_buffering(ws, buffer_view, message_type, end_of_message); if (!end_of_message || !whole_buffer) { - return null; + if (MonoWasmThreads) { + return Promise.resolve(); + } else { + // finish synchronously + return null; + } } return _mono_wasm_web_socket_send_and_wait(ws, whole_buffer); @@ -150,10 +155,14 @@ export function ws_wasm_receive(ws: WebSocketExtension, buffer_ptr: VoidPtr, buf if (receive_event_queue.getLength()) { mono_assert(receive_promise_queue.getLength() == 0, "ERR20: Invalid WS state"); - // finish synchronously _mono_wasm_web_socket_receive_buffering(ws, receive_event_queue, buffer_ptr, buffer_length); - return null; + if (MonoWasmThreads) { + return Promise.resolve(); + } else { + // finish synchronously + return null; + } } const { promise, promise_control } = createPromiseController(); const receive_promise_control = promise_control as ReceivePromiseControl; @@ -168,7 +177,12 @@ export function ws_wasm_close(ws: WebSocketExtension, code: number, reason: stri mono_assert(!!ws, "ERR19: expected ws instance"); if (ws.readyState == WebSocket.CLOSED) { - return null; + if (MonoWasmThreads) { + return Promise.resolve(); + } else { + // finish synchronously + return null; + } } if (wait_for_close_received) { @@ -192,7 +206,12 @@ export function ws_wasm_close(ws: WebSocketExtension, code: number, reason: stri } else { ws.close(code); } - return null; + if (MonoWasmThreads) { + return Promise.resolve(); + } else { + // finish synchronously + return null; + } } } @@ -239,7 +258,12 @@ function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view // Otherwise we block so that we apply some backpresure to the application sending large data. // this is different from Managed implementation if (ws.bufferedAmount < ws_send_buffer_blocking_threshold) { - return null; + if (MonoWasmThreads) { + return Promise.resolve(); + } else { + // finish synchronously + return null; + } } // block the promise/task until the browser passed the buffer to OS From 69dc13127c24b6278dc91d99a9cccfd909fc000a Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Thu, 30 Nov 2023 21:50:45 +0100 Subject: [PATCH 2/4] make the HTTP+WS clients much more async, fix tests --- .../System/Net/Http/HttpClientHandlerTest.cs | 16 +++- .../BrowserHttpHandler/BrowserHttpHandler.cs | 38 +++++---- .../BrowserWebSockets/BrowserWebSocket.cs | 12 +-- .../tests/CloseTest.cs | 5 +- .../SynchronizationContextExtensions.cs | 80 +++++++++++++++++++ src/mono/wasm/runtime/cancelable-promise.ts | 6 ++ src/mono/wasm/runtime/web-socket.ts | 51 +++++------- 7 files changed, 149 insertions(+), 59 deletions(-) diff --git a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs index 857c4f71aa3f4..0ffd0d4ae82ed 100644 --- a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs +++ b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs @@ -1391,8 +1391,20 @@ await server.AcceptConnectionAsync(async connection => { await connection.ReadRequestDataAsync(); tcs2.SetResult(true); - await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false); - await connection.SendResponseBodyAsync("1\r\nh\r\n", false); + try + { + await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false); + await connection.SendResponseBodyAsync("1\r\nh\r\n", false); + } + catch (IOException ex) + { + // when testing in the browser, we are using the WebSocket for the loopback + // it could get disconnected after the cancellation above, earlier than the server-side gets chance to write the response + if (!(ex.InnerException is InvalidOperationException ivd) || !ivd.Message.Contains("The WebSocket is not connected")) + { + throw; + } + } await tcs.Task; }); }); diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs index b1ef3b6c8999c..218a51b441fe8 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/BrowserHttpHandler.cs @@ -334,7 +334,7 @@ protected internal override Task SendAsync(HttpRequestMessa { bool? allowAutoRedirect = _isAllowAutoRedirectTouched ? AllowAutoRedirect : null; #if FEATURE_WASM_THREADS - return JSHost.CurrentOrMainJSSynchronizationContext.Send(() => + return JSHost.CurrentOrMainJSSynchronizationContext.Post(() => { #endif return Impl(request, cancellationToken, allowAutoRedirect); @@ -365,7 +365,7 @@ private Task WriteAsyncCore(ReadOnlyMemory buffer, CancellationToken cance { cancellationToken.ThrowIfCancellationRequested(); #if FEATURE_WASM_THREADS - return _transformStream.SynchronizationContext.Send(() => Impl(this, buffer, cancellationToken)); + return _transformStream.SynchronizationContext.Post(() => Impl(this, buffer, cancellationToken)); #else return Impl(this, buffer, cancellationToken); #endif @@ -474,24 +474,25 @@ public void Dispose() return; #if FEATURE_WASM_THREADS - FetchResponse?.SynchronizationContext.Send(static (WasmFetchResponse self) => + FetchResponse?.SynchronizationContext.Post(static (WasmFetchResponse self) => { lock (self.ThisLock) { - if (self._isDisposed) - return; - self._isDisposed = true; - self._abortRegistration.Dispose(); - self._abortController.Dispose(); - if (!self.FetchResponse!.IsDisposed) + if (!self._isDisposed) { - BrowserHttpInterop.AbortResponse(self.FetchResponse); + self._isDisposed = true; + self._abortRegistration.Dispose(); + self._abortController.Dispose(); + if (!self.FetchResponse!.IsDisposed) + { + BrowserHttpInterop.AbortResponse(self.FetchResponse); + } + self.FetchResponse.Dispose(); + self.FetchResponse = null; } - self.FetchResponse.Dispose(); - self.FetchResponse = null; + return Task.CompletedTask; } }, this); - #else _isDisposed = true; _abortRegistration.Dispose(); @@ -521,7 +522,7 @@ public BrowserHttpContent(WasmFetchResponse fetchResponse) _fetchResponse = fetchResponse; } - // TODO alocate smaller buffer and call multiple times + // TODO allocate smaller buffer and call multiple times private async ValueTask GetResponseData(CancellationToken cancellationToken) { Task promise; @@ -557,12 +558,13 @@ protected override Task CreateContentReadStreamAsync() { _fetchResponse.ThrowIfDisposed(); #if FEATURE_WASM_THREADS - return _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this)); + return _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this)); #else return Impl(this); #endif static async Task Impl(BrowserHttpContent self) { + self._fetchResponse.ThrowIfDisposed(); byte[] data = await self.GetResponseData(CancellationToken.None).ConfigureAwait(true); return new MemoryStream(data, writable: false); } @@ -576,13 +578,14 @@ protected override Task SerializeToStreamAsync(Stream stream, TransportContext? ArgumentNullException.ThrowIfNull(stream, nameof(stream)); _fetchResponse.ThrowIfDisposed(); #if FEATURE_WASM_THREADS - return _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this, stream, cancellationToken)); + return _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this, stream, cancellationToken)); #else return Impl(this, stream, cancellationToken); #endif static async Task Impl(BrowserHttpContent self, Stream stream, CancellationToken cancellationToken) { + self._fetchResponse.ThrowIfDisposed(); byte[] data = await self.GetResponseData(cancellationToken).ConfigureAwait(true); await stream.WriteAsync(data, cancellationToken).ConfigureAwait(true); } @@ -621,13 +624,14 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation ArgumentNullException.ThrowIfNull(buffer, nameof(buffer)); _fetchResponse.ThrowIfDisposed(); #if FEATURE_WASM_THREADS - return await _fetchResponse.FetchResponse!.SynchronizationContext.Send(() => Impl(this, buffer, cancellationToken)).ConfigureAwait(true); + return await _fetchResponse.FetchResponse!.SynchronizationContext.Post(() => Impl(this, buffer, cancellationToken)).ConfigureAwait(true); #else return await Impl(this, buffer, cancellationToken).ConfigureAwait(true); #endif static async Task Impl(WasmHttpReadStream self, Memory buffer, CancellationToken cancellationToken) { + self._fetchResponse.ThrowIfDisposed(); Task promise; using (Buffers.MemoryHandle handle = buffer.Pin()) { diff --git a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs index 618ea16c0886f..90c296c68e01d 100644 --- a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs +++ b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs @@ -130,7 +130,7 @@ internal Task ConnectAsync(Uri uri, List? requestedSubProtocols, Cancell } }, null); - return JSHost.CurrentOrMainJSSynchronizationContext.Send(() => + return JSHost.CurrentOrMainJSSynchronizationContext.Post(() => { return ConnectAsyncCore(cancellationToken); }); @@ -167,7 +167,7 @@ public override Task SendAsync(ArraySegment buffer, WebSocketMessageType m WebSocketValidate.ValidateArraySegment(buffer, nameof(buffer)); #if FEATURE_WASM_THREADS - return _innerWebSocket!.SynchronizationContext.Send(() => + return _innerWebSocket!.SynchronizationContext.Post(() => { Task promise; lock (_thisLock) @@ -200,7 +200,7 @@ public override Task ReceiveAsync(ArraySegment buf WebSocketValidate.ValidateArraySegment(buffer, nameof(buffer)); #if FEATURE_WASM_THREADS - return _innerWebSocket!.SynchronizationContext.Send(() => + return _innerWebSocket!.SynchronizationContext.Post(() => { Task promise; lock (_thisLock) @@ -228,7 +228,7 @@ public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string? } #if FEATURE_WASM_THREADS - return _innerWebSocket!.SynchronizationContext.Send(() => + return _innerWebSocket!.SynchronizationContext.Post(() => { Task promise; lock (_thisLock) @@ -240,7 +240,7 @@ public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string? { throw new WebSocketException(WebSocketError.InvalidState, SR.Format(SR.net_WebSockets_InvalidState, state, "Connecting, Open, CloseSent, Aborted")); } - if(state != WebSocketState.Open && state != WebSocketState.Connecting && state != WebSocketState.Aborted) + if (state != WebSocketState.Open && state != WebSocketState.Connecting && state != WebSocketState.Aborted) { return Task.CompletedTask; } @@ -268,7 +268,7 @@ public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? status } #if FEATURE_WASM_THREADS - return _innerWebSocket!.SynchronizationContext.Send(() => + return _innerWebSocket!.SynchronizationContext.Post(() => { Task promise; lock (_thisLock) diff --git a/src/libraries/System.Net.WebSockets.Client/tests/CloseTest.cs b/src/libraries/System.Net.WebSockets.Client/tests/CloseTest.cs index 9e33810098f7d..26dfd9a4fd01b 100644 --- a/src/libraries/System.Net.WebSockets.Client/tests/CloseTest.cs +++ b/src/libraries/System.Net.WebSockets.Client/tests/CloseTest.cs @@ -224,7 +224,7 @@ await Assert.ThrowsAnyAsync(async () => [OuterLoop("Uses external servers", typeof(PlatformDetection), nameof(PlatformDetection.LocalEchoServerIsNotAvailable))] [ConditionalTheory(nameof(WebSocketsSupported)), MemberData(nameof(EchoServers))] - [ActiveIssue("https://github.com/dotnet/runtime/issues/83517", typeof(PlatformDetection), nameof(PlatformDetection.IsNodeJS))] + [SkipOnPlatform(TestPlatforms.Browser, "This never really worked for browser, it was just lucky timing that browser's `close` event was executed in next browser tick, for this test. See also https://github.com/dotnet/runtime/issues/45538")] public async Task CloseOutputAsync_ClientInitiated_CanReceive_CanClose(Uri server) { string message = "Hello WebSockets!"; @@ -233,8 +233,7 @@ public async Task CloseOutputAsync_ClientInitiated_CanReceive_CanClose(Uri serve { var cts = new CancellationTokenSource(TimeOutMilliseconds); - // See issue for Browser websocket differences https://github.com/dotnet/runtime/issues/45538 - var closeStatus = PlatformDetection.IsBrowser ? WebSocketCloseStatus.NormalClosure : WebSocketCloseStatus.InvalidPayloadData; + var closeStatus = WebSocketCloseStatus.InvalidPayloadData; string closeDescription = "CloseOutputAsync_Client_InvalidPayloadData"; await cws.SendAsync(WebSocketData.GetBufferFromText(message), WebSocketMessageType.Text, true, cts.Token); diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/SynchronizationContextExtensions.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/SynchronizationContextExtensions.cs index bebe252a409e6..0e3fb01dcd278 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/SynchronizationContextExtensions.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/SynchronizationContextExtensions.cs @@ -61,6 +61,86 @@ public static TRes Send(this SynchronizationContext? self, Func body return value!; } + public static Task Post(this SynchronizationContext? self, Func> body) + { + if (self == null) return body(); + + TaskCompletionSource tcs = new TaskCompletionSource(); + self.Post(async (_) => + { + try + { + var value = await body().ConfigureAwait(false); + tcs.TrySetResult(value); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }, null); + return tcs.Task; + } + + public static Task Post(this SynchronizationContext? self, Func> body, T1 p1) + { + if (self == null) return body(p1); + + TaskCompletionSource tcs = new TaskCompletionSource(); + self.Post(async (_) => + { + try + { + var value = await body(p1).ConfigureAwait(false); + tcs.TrySetResult(value); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }, null); + return tcs.Task; + } + + public static Task Post(this SynchronizationContext? self, Func body, T1 p1) + { + if (self == null) return body(p1); + + TaskCompletionSource tcs = new TaskCompletionSource(); + self.Post(async (_) => + { + try + { + await body(p1).ConfigureAwait(false); + tcs.TrySetResult(); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }, null); + return tcs.Task; + } + + public static Task Post(this SynchronizationContext? self, Func body) + { + if (self == null) return body(); + + TaskCompletionSource tcs = new TaskCompletionSource(); + self.Post(async (_) => + { + try + { + await body().ConfigureAwait(false); + tcs.TrySetResult(); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }, null); + return tcs.Task; + } + public static TRes Send(this SynchronizationContext? self, Func body, T1 p1) { if (self == null) return body(p1); diff --git a/src/mono/wasm/runtime/cancelable-promise.ts b/src/mono/wasm/runtime/cancelable-promise.ts index 3ae287bf99e4d..275e243d87933 100644 --- a/src/mono/wasm/runtime/cancelable-promise.ts +++ b/src/mono/wasm/runtime/cancelable-promise.ts @@ -22,6 +22,12 @@ export function wrap_as_cancelable_promise(fn: () => Promise): Controllabl return promise; } +export function wrap_as_cancelable(inner: Promise): ControllablePromise { + const { promise, promise_control } = createPromiseController(); + inner.then((data) => promise_control.resolve(data)).catch((reason) => promise_control.reject(reason)); + return promise; +} + export function mono_wasm_cancel_promise(task_holder_gcv_handle: GCHandle): void { const holder = _lookup_js_owned_object(task_holder_gcv_handle) as PromiseHolder; mono_assert(!!holder, () => `Expected Promise for GCVHandle ${task_holder_gcv_handle}`); diff --git a/src/mono/wasm/runtime/web-socket.ts b/src/mono/wasm/runtime/web-socket.ts index cfa50cf739cfd..8820bbc11f838 100644 --- a/src/mono/wasm/runtime/web-socket.ts +++ b/src/mono/wasm/runtime/web-socket.ts @@ -12,6 +12,7 @@ import { PromiseController } from "./types/internal"; import { mono_log_warn } from "./logging"; import { viewOrCopy, utf8ToStringRelaxed, stringToUTF8 } from "./strings"; import { IDisposable } from "./marshal"; +import { wrap_as_cancelable } from "./cancelable-promise"; const wasm_ws_pending_send_buffer = Symbol.for("wasm ws_pending_send_buffer"); const wasm_ws_pending_send_buffer_offset = Symbol.for("wasm ws_pending_send_buffer_offset"); @@ -19,6 +20,7 @@ const wasm_ws_pending_send_buffer_type = Symbol.for("wasm ws_pending_send_buffer const wasm_ws_pending_receive_event_queue = Symbol.for("wasm ws_pending_receive_event_queue"); const wasm_ws_pending_receive_promise_queue = Symbol.for("wasm ws_pending_receive_promise_queue"); const wasm_ws_pending_open_promise = Symbol.for("wasm ws_pending_open_promise"); +const wasm_ws_pending_open_promise_used = Symbol.for("wasm wasm_ws_pending_open_promise_used"); const wasm_ws_pending_close_promises = Symbol.for("wasm ws_pending_close_promises"); const wasm_ws_pending_send_promises = Symbol.for("wasm ws_pending_send_promises"); const wasm_ws_is_aborted = Symbol.for("wasm ws_is_aborted"); @@ -120,6 +122,7 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece export function ws_wasm_open(ws: WebSocketExtension): Promise | null { mono_assert(!!ws, "ERR17: expected ws instance"); const open_promise_control = ws[wasm_ws_pending_open_promise]; + ws[wasm_ws_pending_open_promise_used] = true; return open_promise_control.promise; } @@ -130,12 +133,7 @@ export function ws_wasm_send(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer const whole_buffer = _mono_wasm_web_socket_send_buffering(ws, buffer_view, message_type, end_of_message); if (!end_of_message || !whole_buffer) { - if (MonoWasmThreads) { - return Promise.resolve(); - } else { - // finish synchronously - return null; - } + return resolvedPromise(); } return _mono_wasm_web_socket_send_and_wait(ws, whole_buffer); @@ -157,12 +155,7 @@ export function ws_wasm_receive(ws: WebSocketExtension, buffer_ptr: VoidPtr, buf _mono_wasm_web_socket_receive_buffering(ws, receive_event_queue, buffer_ptr, buffer_length); - if (MonoWasmThreads) { - return Promise.resolve(); - } else { - // finish synchronously - return null; - } + return resolvedPromise(); } const { promise, promise_control } = createPromiseController(); const receive_promise_control = promise_control as ReceivePromiseControl; @@ -177,12 +170,7 @@ export function ws_wasm_close(ws: WebSocketExtension, code: number, reason: stri mono_assert(!!ws, "ERR19: expected ws instance"); if (ws.readyState == WebSocket.CLOSED) { - if (MonoWasmThreads) { - return Promise.resolve(); - } else { - // finish synchronously - return null; - } + return resolvedPromise(); } if (wait_for_close_received) { @@ -206,12 +194,7 @@ export function ws_wasm_close(ws: WebSocketExtension, code: number, reason: stri } else { ws.close(code); } - if (MonoWasmThreads) { - return Promise.resolve(); - } else { - // finish synchronously - return null; - } + return resolvedPromise(); } } @@ -234,7 +217,8 @@ export function ws_wasm_abort(ws: WebSocketExtension): void { function reject_promises(ws: WebSocketExtension, error: Error) { const open_promise_control = ws[wasm_ws_pending_open_promise]; - if (open_promise_control) { + const open_promise_used = ws[wasm_ws_pending_open_promise_used]; + if (open_promise_control && open_promise_used) { open_promise_control.reject(error); } for (const close_promise_control of ws[wasm_ws_pending_close_promises]) { @@ -258,12 +242,7 @@ function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view // Otherwise we block so that we apply some backpresure to the application sending large data. // this is different from Managed implementation if (ws.bufferedAmount < ws_send_buffer_blocking_threshold) { - if (MonoWasmThreads) { - return Promise.resolve(); - } else { - // finish synchronously - return null; - } + return resolvedPromise(); } // block the promise/task until the browser passed the buffer to OS @@ -427,6 +406,7 @@ type WebSocketExtension = WebSocket & { [wasm_ws_pending_receive_event_queue]: Queue; [wasm_ws_pending_receive_promise_queue]: Queue; [wasm_ws_pending_open_promise]: PromiseController + [wasm_ws_pending_open_promise_used]: boolean [wasm_ws_pending_send_promises]: PromiseController[] [wasm_ws_pending_close_promises]: PromiseController[] [wasm_ws_is_aborted]: boolean @@ -448,3 +428,12 @@ type Message = { data: Uint8Array, offset: number } + +function resolvedPromise(): Promise | null { + if (MonoWasmThreads) { + return wrap_as_cancelable(Promise.resolve()); + } else { + // finish synchronously + return null; + } +} \ No newline at end of file From cba940317a19b119db06b409c6c72648abd73175 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Thu, 30 Nov 2023 22:05:27 +0100 Subject: [PATCH 3/4] explain --- src/mono/wasm/runtime/web-socket.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/mono/wasm/runtime/web-socket.ts b/src/mono/wasm/runtime/web-socket.ts index 8820bbc11f838..c5a69ddf267ae 100644 --- a/src/mono/wasm/runtime/web-socket.ts +++ b/src/mono/wasm/runtime/web-socket.ts @@ -218,6 +218,10 @@ export function ws_wasm_abort(ws: WebSocketExtension): void { function reject_promises(ws: WebSocketExtension, error: Error) { const open_promise_control = ws[wasm_ws_pending_open_promise]; const open_promise_used = ws[wasm_ws_pending_open_promise_used]; + + // when `open_promise_used` is false, we should not reject it, + // because it would be unhandled rejection. Nobody is subscribed yet. + // The subscription comes on the next call, which is `ws_wasm_open`, but cancelation/abort could happen in the meantime. if (open_promise_control && open_promise_used) { open_promise_control.reject(error); } From 1f9b920f89ad150f398bb0e5d336935b75b90aa0 Mon Sep 17 00:00:00 2001 From: pavelsavara Date: Fri, 1 Dec 2023 16:33:39 +0100 Subject: [PATCH 4/4] feedback --- .../BrowserWebSockets/BrowserWebSocket.cs | 8 +++--- .../InteropServices/JavaScript/JSHost.cs | 6 ++-- .../SynchronizationContextExtensions.cs | 28 ++++--------------- src/mono/wasm/runtime/web-socket.ts | 14 +++++++--- 4 files changed, 23 insertions(+), 33 deletions(-) diff --git a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs index 90c296c68e01d..68120a5cf8f7e 100644 --- a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs +++ b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs @@ -120,7 +120,7 @@ internal Task ConnectAsync(Uri uri, List? requestedSubProtocols, Cancell throw new InvalidOperationException(SR.net_WebSockets_AlreadyStarted); } #if FEATURE_WASM_THREADS - JSHost.CurrentOrMainJSSynchronizationContext!.Send(_ => + JSHost.CurrentOrMainJSSynchronizationContext.Send(_ => { lock (_thisLock) { @@ -484,7 +484,7 @@ private async Task SendAsyncCore(ArraySegment buffer, WebSocketMessageType // return synchronously, not supported with MT else { - Environment.FailFast("Unexpected synchronous result"); + Environment.FailFast("BrowserWebSocket.SendAsyncCore: Unexpected synchronous result"); } #endif } @@ -514,7 +514,7 @@ private async Task ReceiveAsyncCore(ArraySegment b // return synchronously, not supported with MT else { - Environment.FailFast("Unexpected synchronous result"); + Environment.FailFast("BrowserWebSocket.ReceiveAsyncCore: Unexpected synchronous result"); } #endif @@ -567,7 +567,7 @@ private async Task CloseAsyncCore(WebSocketCloseStatus closeStatus, string? stat // return synchronously, not supported with MT else { - Environment.FailFast("Unexpected synchronous result"); + Environment.FailFast("BrowserWebSocket.CloseAsyncCore: Unexpected synchronous result"); } #endif diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHost.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHost.cs index 69b91b88a5905..89a7eb8a0246e 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHost.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSHost.cs @@ -59,15 +59,15 @@ public static Task ImportAsync(string moduleName, string moduleUrl, Ca return JSHostImplementation.ImportAsync(moduleName, moduleUrl, cancellationToken); } - public static SynchronizationContext? CurrentOrMainJSSynchronizationContext + public static SynchronizationContext CurrentOrMainJSSynchronizationContext { [MethodImpl(MethodImplOptions.AggressiveInlining)] get { #if FEATURE_WASM_THREADS - return JSSynchronizationContext.CurrentJSSynchronizationContext ?? JSSynchronizationContext.MainJSSynchronizationContext ?? null; + return JSSynchronizationContext.CurrentJSSynchronizationContext ?? JSSynchronizationContext.MainJSSynchronizationContext!; #else - return null; + return null!; #endif } } diff --git a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/SynchronizationContextExtensions.cs b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/SynchronizationContextExtensions.cs index 0e3fb01dcd278..6f63db9cfd337 100644 --- a/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/SynchronizationContextExtensions.cs +++ b/src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/SynchronizationContextExtensions.cs @@ -11,14 +11,8 @@ namespace System.Runtime.InteropServices.JavaScript /// public static class SynchronizationContextExtension { - public static void Send(this SynchronizationContext? self, Action body, T value) + public static void Send(this SynchronizationContext self, Action body, T value) { - if (self == null) - { - body(value); - return; - } - Exception? exc = default; self.Send((_value) => { @@ -37,10 +31,8 @@ public static void Send(this SynchronizationContext? self, Action body, T } } - public static TRes Send(this SynchronizationContext? self, Func body) + public static TRes Send(this SynchronizationContext self, Func body) { - if (self == null) return body(); - TRes? value = default; Exception? exc = default; self.Send((_) => @@ -61,10 +53,8 @@ public static TRes Send(this SynchronizationContext? self, Func body return value!; } - public static Task Post(this SynchronizationContext? self, Func> body) + public static Task Post(this SynchronizationContext self, Func> body) { - if (self == null) return body(); - TaskCompletionSource tcs = new TaskCompletionSource(); self.Post(async (_) => { @@ -101,10 +91,8 @@ public static Task Post(this SynchronizationContext? self, Func< return tcs.Task; } - public static Task Post(this SynchronizationContext? self, Func body, T1 p1) + public static Task Post(this SynchronizationContext self, Func body, T1 p1) { - if (self == null) return body(p1); - TaskCompletionSource tcs = new TaskCompletionSource(); self.Post(async (_) => { @@ -121,10 +109,8 @@ public static Task Post(this SynchronizationContext? self, Func bo return tcs.Task; } - public static Task Post(this SynchronizationContext? self, Func body) + public static Task Post(this SynchronizationContext self, Func body) { - if (self == null) return body(); - TaskCompletionSource tcs = new TaskCompletionSource(); self.Post(async (_) => { @@ -141,10 +127,8 @@ public static Task Post(this SynchronizationContext? self, Func body) return tcs.Task; } - public static TRes Send(this SynchronizationContext? self, Func body, T1 p1) + public static TRes Send(this SynchronizationContext self, Func body, T1 p1) { - if (self == null) return body(p1); - TRes? value = default; Exception? exc = default; self.Send((_) => diff --git a/src/mono/wasm/runtime/web-socket.ts b/src/mono/wasm/runtime/web-socket.ts index c5a69ddf267ae..9c43849f39534 100644 --- a/src/mono/wasm/runtime/web-socket.ts +++ b/src/mono/wasm/runtime/web-socket.ts @@ -434,10 +434,16 @@ type Message = { } function resolvedPromise(): Promise | null { - if (MonoWasmThreads) { - return wrap_as_cancelable(Promise.resolve()); - } else { - // finish synchronously + if (!MonoWasmThreads) { + // signal that we are finished synchronously + // this is optimization, which doesn't allocate and doesn't require to marshal resolve() call to C# side. return null; + } else { + // passing synchronous `null` as value of the result of the async JSImport function is not possible when there is message sent across threads. + const resolved = Promise.resolve(); + // the C# code in the BrowserWebSocket expects that promise returned from this code is instance of `ControllablePromise` + // so that C# side could call `mono_wasm_cancel_promise` on it. + // in practice the `resolve()` callback would arrive before the `reject()` of the cancelation. + return wrap_as_cancelable(resolved); } } \ No newline at end of file