From acd1afa8f50d2a714fa2ce42e808dd3e0f2a4f72 Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Thu, 29 Aug 2024 15:53:58 -0400 Subject: [PATCH] Provides a new implementation for SSE in the webserver. This implementation does not use the normal output stream to serialize the events to avoid problems with buffering and chunked encoding. Instead, it writes directly to the underlying socket writer and flushes data as needed. As a result, chunked encoding is no longer used for SSE. Several tests have been updated. --- .../testing/http/junit5/SocketHttpClient.java | 9 + .../webclient/http1/Http1CallChainBase.java | 20 ++- .../sse/SseSourceHandlerProvider.java | 1 - .../io/helidon/webserver/sse/SseSink.java | 165 ++++++++++++------ .../webserver/sse/SseSinkProvider.java | 17 +- webserver/sse/src/main/java/module-info.java | 3 +- .../junit5/DirectClientServerContext.java | 7 +- .../webserver/tests/sse/SimpleSseClient.java | 132 ++++++++++++++ .../webserver/tests/sse/SseServerTest.java | 52 +++--- .../helidon/webserver/ConnectionContext.java | 14 +- .../helidon/webserver/ConnectionHandler.java | 5 + .../webserver/http/spi/SinkProvider.java | 18 +- .../http/spi/SinkProviderContext.java | 47 +++++ .../webserver/http1/Http1ServerResponse.java | 30 +++- 14 files changed, 428 insertions(+), 92 deletions(-) create mode 100644 webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SimpleSseClient.java create mode 100644 webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProviderContext.java diff --git a/common/testing/http-junit5/src/main/java/io/helidon/common/testing/http/junit5/SocketHttpClient.java b/common/testing/http-junit5/src/main/java/io/helidon/common/testing/http/junit5/SocketHttpClient.java index 92ec6aed91a..75838cd1710 100644 --- a/common/testing/http-junit5/src/main/java/io/helidon/common/testing/http/junit5/SocketHttpClient.java +++ b/common/testing/http-junit5/src/main/java/io/helidon/common/testing/http/junit5/SocketHttpClient.java @@ -605,6 +605,15 @@ public SocketHttpClient sendChunk(String payload) throws IOException { return this; } + /** + * Provides access to underlying socket reader. + * + * @return the reader + */ + public BufferedReader socketReader() { + return socketReader; + } + /** * Override this to send a specific payload. * diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallChainBase.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallChainBase.java index 3f616060f3d..4916af47c88 100644 --- a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallChainBase.java +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1CallChainBase.java @@ -411,11 +411,15 @@ public int read() { if (finished) { return -1; } - ensureBuffer(512); - if (finished || currentBuffer == null) { + try { + ensureBuffer(512); + if (finished || currentBuffer == null) { + return -1; + } + return currentBuffer.read(); + } catch (DataReader.InsufficientDataAvailableException e) { return -1; } - return currentBuffer.read(); } @Override @@ -423,11 +427,15 @@ public int read(byte[] b, int off, int len) { if (finished) { return -1; } - ensureBuffer(len); - if (finished || currentBuffer == null) { + try { + ensureBuffer(len); + if (finished || currentBuffer == null) { + return -1; + } + return currentBuffer.read(b, off, len); + } catch (DataReader.InsufficientDataAvailableException e) { return -1; } - return currentBuffer.read(b, off, len); } private void ensureBuffer(int estimate) { diff --git a/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java b/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java index 502bb1f8989..bf8e240ad31 100644 --- a/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java +++ b/webclient/sse/src/main/java/io/helidon/webclient/sse/SseSourceHandlerProvider.java @@ -92,7 +92,6 @@ public > void handle(X source, HttpClientResponse res emit = false; } } - source.onClose(); } catch (IOException e) { source.onError(e); diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java index 8cae4685a3e..74815bd88fe 100644 --- a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java @@ -16,23 +16,33 @@ package io.helidon.webserver.sse; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.Optional; -import java.util.function.BiConsumer; import io.helidon.common.GenericType; +import io.helidon.common.buffers.BufferData; import io.helidon.common.media.type.MediaType; import io.helidon.common.media.type.MediaTypes; +import io.helidon.http.DateTime; +import io.helidon.http.Header; +import io.helidon.http.HeaderNames; import io.helidon.http.HttpMediaType; +import io.helidon.http.ServerResponseHeaders; import io.helidon.http.Status; +import io.helidon.http.WritableHeaders; +import io.helidon.http.media.EntityWriter; +import io.helidon.http.media.MediaContext; import io.helidon.http.sse.SseEvent; +import io.helidon.webserver.ConnectionContext; import io.helidon.webserver.http.ServerResponse; import io.helidon.webserver.http.spi.Sink; +import io.helidon.webserver.http.spi.SinkProviderContext; import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM; +import static io.helidon.http.HeaderValues.create; /** * Implementation of an SSE sink. Emits {@link SseEvent}s. @@ -44,71 +54,128 @@ public class SseSink implements Sink { */ public static final GenericType TYPE = GenericType.create(SseSink.class); + private static final Header CACHE_NO_CACHE_ONLY = create(HeaderNames.CACHE_CONTROL, "no-cache"); private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8); private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8); private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8); private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8); private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8); + private static final byte[] OK_200 = "HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] DATE = "Date: ".getBytes(StandardCharsets.UTF_8); - private final BiConsumer eventConsumer; - private final Runnable closeRunnable; - private final OutputStream outputStream; - - SseSink(ServerResponse serverResponse, BiConsumer eventConsumer, Runnable closeRunnable) { - // Verify response has no status or content type - HttpMediaType ct = serverResponse.headers().contentType().orElse(null); - if (serverResponse.status().code() != Status.OK_200.code() - || ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) { - throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse"); - } + private static final WritableHeaders EMPTY_HEADERS = WritableHeaders.create(); - // Ensure content type set for SSE - if (ct == null) { - serverResponse.headers().add(CONTENT_TYPE_EVENT_STREAM); - } + private final ServerResponse response; + private final ConnectionContext ctx; + private final MediaContext mediaContext; + private final Runnable closeRunnable; - this.outputStream = serverResponse.outputStream(); - this.eventConsumer = eventConsumer; - this.closeRunnable = closeRunnable; + SseSink(SinkProviderContext context) { + this.response = context.serverResponse(); + this.ctx = context.connectionContext(); + this.mediaContext = ctx.listenerContext().mediaContext(); + this.closeRunnable = context.closeRunnable(); + initResponse(); } @Override public SseSink emit(SseEvent sseEvent) { - try { - Optional comment = sseEvent.comment(); - if (comment.isPresent()) { - outputStream.write(SSE_COMMENT); - outputStream.write(comment.get().getBytes(StandardCharsets.UTF_8)); - outputStream.write(SSE_NL); - } - Optional id = sseEvent.id(); - if (id.isPresent()) { - outputStream.write(SSE_ID); - outputStream.write(id.get().getBytes(StandardCharsets.UTF_8)); - outputStream.write(SSE_NL); - } - Optional name = sseEvent.name(); - if (name.isPresent()) { - outputStream.write(SSE_EVENT); - outputStream.write(name.get().getBytes(StandardCharsets.UTF_8)); - outputStream.write(SSE_NL); - } - Object data = sseEvent.data(); - if (data != SseEvent.NO_DATA) { - outputStream.write(SSE_DATA); - eventConsumer.accept(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN)); - outputStream.write(SSE_NL); - } - outputStream.write(SSE_NL); - outputStream.flush(); - } catch (IOException e) { - throw new UncheckedIOException(e); + BufferData bufferData = BufferData.growing(512); + + Optional comment = sseEvent.comment(); + if (comment.isPresent()) { + bufferData.write(SSE_COMMENT); + bufferData.write(comment.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); + } + Optional id = sseEvent.id(); + if (id.isPresent()) { + bufferData.write(SSE_ID); + bufferData.write(id.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); } + Optional name = sseEvent.name(); + if (name.isPresent()) { + bufferData.write(SSE_EVENT); + bufferData.write(name.get().getBytes(StandardCharsets.UTF_8)); + bufferData.write(SSE_NL); + } + Object data = sseEvent.data(); + if (data != null) { + bufferData.write(SSE_DATA); + byte[] bytes = serializeData(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN)); + bufferData.write(bytes); + bufferData.write(SSE_NL); + } + bufferData.write(SSE_NL); + + // write event to the network + ctx.dataWriter().writeNow(bufferData); return this; } @Override public void close() { closeRunnable.run(); + ctx.serverSocket().close(); + } + + void initResponse() { + ServerResponseHeaders headers = response.headers(); + + // verify response has no status or content type + HttpMediaType ct = headers.contentType().orElse(null); + if (response.status().code() != Status.OK_200.code() + || ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) { + throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse"); + } + if (ct == null) { + headers.add(CONTENT_TYPE_EVENT_STREAM); + } + headers.add(CACHE_NO_CACHE_ONLY); + + // start writing heading to buffer + BufferData buffer = BufferData.growing(256); + buffer.write(OK_200); + + // serialize headers + if (!headers.contains(HeaderNames.DATE)) { + buffer.write(DATE); + byte[] dateBytes = DateTime.http1Bytes(); + buffer.write(dateBytes); + } + for (Header header : headers) { + header.writeHttp1Header(buffer); + } + + // complete heading + buffer.write('\r'); // "\r\n" - empty line after headers + buffer.write('\n'); + + // write response heading to the network + ctx.dataWriter().writeNow(buffer); + } + + private byte[] serializeData(Object object, MediaType mediaType) { + if (object instanceof byte[] bytes) { + return bytes; + } else if (mediaContext != null) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + if (object instanceof String str && mediaType.equals(MediaTypes.TEXT_PLAIN)) { + EntityWriter writer = mediaContext.writer(GenericType.STRING, EMPTY_HEADERS, EMPTY_HEADERS); + writer.write(GenericType.STRING, str, baos, EMPTY_HEADERS, EMPTY_HEADERS); + } else { + GenericType type = GenericType.create(object); + WritableHeaders resHeaders = WritableHeaders.create(); + resHeaders.set(HeaderNames.CONTENT_TYPE, mediaType.text()); + EntityWriter writer = mediaContext.writer(type, EMPTY_HEADERS, resHeaders); + writer.write(type, object, baos, EMPTY_HEADERS, resHeaders); + } + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + throw new IllegalStateException("Unable to serialize SSE event without a media context"); } } diff --git a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java index 466a1d5d126..ee73ddabb35 100644 --- a/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java +++ b/webserver/sse/src/main/java/io/helidon/webserver/sse/SseSinkProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import io.helidon.webserver.http.ServerResponse; import io.helidon.webserver.http.spi.Sink; import io.helidon.webserver.http.spi.SinkProvider; +import io.helidon.webserver.http.spi.SinkProviderContext; /** * Sink provider for SSE type. @@ -37,10 +38,18 @@ public boolean supports(GenericType> type, ServerRequest reque return SseSink.TYPE.equals(type) && request.headers().isAccepted(MediaTypes.TEXT_EVENT_STREAM); } + @Override @SuppressWarnings("unchecked") - public > X create(ServerResponse response, BiConsumer eventConsumer, - Runnable closeRunnable) { - return (X) new SseSink(response, eventConsumer, closeRunnable); + public > X create(SinkProviderContext context) { + return (X) new SseSink(context); } + + @Override + public > X create(ServerResponse response, + BiConsumer eventConsumer, + Runnable closeRunnable) { + throw new UnsupportedOperationException("Deprecated, use other create method in class"); + } + } diff --git a/webserver/sse/src/main/java/module-info.java b/webserver/sse/src/main/java/module-info.java index 571dbf8ebf3..d3c9fcd3ef1 100644 --- a/webserver/sse/src/main/java/module-info.java +++ b/webserver/sse/src/main/java/module-info.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ requires static io.helidon.common.features.api; + requires io.helidon.common.socket; requires transitive io.helidon.common; requires transitive io.helidon.http.sse; requires transitive io.helidon.webserver; diff --git a/webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectClientServerContext.java b/webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectClientServerContext.java index 03b59ad0f31..e74626ebf81 100644 --- a/webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectClientServerContext.java +++ b/webserver/testing/junit5/junit5/src/main/java/io/helidon/webserver/testing/junit5/DirectClientServerContext.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -138,4 +138,9 @@ public DirectHandlers directHandlers() { public ListenerConfig config() { return listenerConfiguration; } + + @Override + public HelidonSocket serverSocket() { + return serverSocket; + } } diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SimpleSseClient.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SimpleSseClient.java new file mode 100644 index 00000000000..967b0552157 --- /dev/null +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SimpleSseClient.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.webserver.tests.sse; + +import java.io.BufferedReader; +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; + +import io.helidon.common.testing.http.junit5.SocketHttpClient; +import io.helidon.http.Method; + +class SimpleSseClient implements AutoCloseable { + + public enum State { + DISCONNECTED, + CONNECTED, + HEADERS_READ, + ERROR + } + + private State state = State.DISCONNECTED; + private BufferedReader reader; + private final String path; + private final SocketHttpClient client; + + public static SimpleSseClient create(int port, String path) { + return create("localhost", port, path, Duration.ofSeconds(10)); + } + + public static SimpleSseClient create(int port, String path, Duration timeout) { + return create("localhost", port, path, timeout); + } + + public static SimpleSseClient create(String host, int port, String path, Duration timeout) { + return new SimpleSseClient("localhost", port, path, timeout); + } + + private SimpleSseClient(String host, int port, String path, Duration timeout) { + this.path = path; + this.client = SocketHttpClient.create(host, port, timeout); + } + + public String nextEvent() { + ensureConnected(); + ensureHeadersRead(); + + try { + String line; + StringBuilder event = new StringBuilder(); + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + return event.toString(); + } + if (!event.isEmpty()) { + event.append("\n"); + } + event.append(line); + } + if (event.isEmpty()) { + return null; + } + state = State.ERROR; + throw new RuntimeException("Unable to parse response"); + } catch (IOException e) { + state = State.ERROR; + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + client.close(); + state = State.DISCONNECTED; + + } + + public State state() { + return state; + } + + private void ensureConnected() { + if (state == State.DISCONNECTED) { + client.request(Method.GET.toString(), + path, + "HTTP/1.1", + "localhost", + Collections.emptyList(), + null); + reader = client.socketReader(); + state = State.CONNECTED; + } + } + + private void ensureHeadersRead() { + if (state == State.CONNECTED) { + try { + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + state = State.HEADERS_READ; + return; + } + line = line.toLowerCase(); + if (line.contains("http/1.1") && !line.contains("200")) { + throw new RuntimeException("Invalid status code in response"); + } else if (line.contains("content-type") && !line.contains("text/event-stream")) { + throw new RuntimeException("Invalid content-type in response"); + } + } + state = State.ERROR; + throw new RuntimeException("Unable to parse response"); + } catch (IOException e) { + state = State.ERROR; + throw new RuntimeException(e); + } + } + } +} diff --git a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java index cf49ddbe2d8..add41b56af0 100644 --- a/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java +++ b/webserver/tests/sse/src/test/java/io/helidon/webserver/tests/sse/SseServerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,24 +19,25 @@ import io.helidon.http.Status; import io.helidon.webclient.http1.Http1Client; import io.helidon.webclient.http1.Http1ClientResponse; +import io.helidon.webserver.WebServer; import io.helidon.webserver.http.HttpRules; import io.helidon.webserver.testing.junit5.ServerTest; import io.helidon.webserver.testing.junit5.SetUpRoute; import org.junit.jupiter.api.Test; -import static io.helidon.http.HeaderValues.ACCEPT_EVENT_STREAM; import static io.helidon.http.HeaderValues.ACCEPT_JSON; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @ServerTest class SseServerTest extends SseBaseTest { - private final Http1Client client; + private final WebServer webServer; - SseServerTest(Http1Client client) { - this.client = client; + SseServerTest(WebServer webServer) { + this.webServer = webServer; } @SetUpRoute @@ -50,49 +51,52 @@ static void routing(HttpRules rules) { } @Test - void testSseString1() { - testSse("/sseString1", "data:hello\n\ndata:world\n\n"); + void testSseString1() throws Exception { + testSse("/sseString1", "data:hello", "data:world"); } @Test - void testSseString2() { - testSse("/sseString2", "data:1\n\ndata:2\n\ndata:3\n\n"); + void testSseString2() throws Exception { + testSse("/sseString2", "data:1", "data:2", "data:3"); } @Test - void testSseJson1() { - testSse("/sseJson1", "data:{\"hello\":\"world\"}\n\n"); + void testSseJson1() throws Exception { + testSse("/sseJson1", "data:{\"hello\":\"world\"}"); } @Test - void testSseJson2() { - testSse("/sseJson2", "data:{\"hello\":\"world\"}\n\n"); + void testSseJson2() throws Exception { + testSse("/sseJson2", "data:{\"hello\":\"world\"}"); } @Test - void testSseMixed() { - testSse("/sseMixed", - "data:hello\n\ndata:world\n\n" + - "data:{\"hello\":\"world\"}\n\n" + - "data:{\"hello\":\"world\"}\n\n"); + void testSseMixed() throws Exception { + testSse("/sseMixed", "data:hello", "data:world", + "data:{\"hello\":\"world\"}", "data:{\"hello\":\"world\"}"); } @Test - void testIdComment() { - testSse("/sseIdComment", ":This is a comment\nid:1\ndata:hello\n\n"); + void testIdComment() throws Exception { + testSse("/sseIdComment", ":This is a comment\nid:1\ndata:hello"); } @Test void testWrongAcceptType() { + Http1Client client = Http1Client.builder() + .baseUri("http://localhost:" + webServer.port()) + .build(); try (Http1ClientResponse response = client.get("/sseString1").header(ACCEPT_JSON).request()) { assertThat(response.status(), is(Status.NOT_ACCEPTABLE_406)); } } - private void testSse(String path, String result) { - try (Http1ClientResponse response = client.get(path).header(ACCEPT_EVENT_STREAM).request()) { - assertThat(response.status(), is(Status.OK_200)); - assertThat(response.as(String.class), is(result)); + private void testSse(String path, String... events) throws Exception { + try (SimpleSseClient sseClient = SimpleSseClient.create(webServer.port(), path)) { + for (String e : events) { + assertThat(sseClient.nextEvent(), is(e)); + } + assertThat(sseClient.nextEvent(), is(nullValue())); } } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionContext.java b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionContext.java index 1c14c051e91..c267d69dfa3 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionContext.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionContext.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import io.helidon.common.buffers.DataReader; import io.helidon.common.buffers.DataWriter; +import io.helidon.common.socket.HelidonSocket; import io.helidon.common.socket.SocketContext; /** @@ -58,7 +59,7 @@ public interface ConnectionContext extends SocketContext { /** * Router that may contain routings of different types (HTTP, WebSocket, grpc). * - * @return rouer + * @return the router */ Router router(); @@ -71,4 +72,13 @@ public interface ConnectionContext extends SocketContext { default Optional proxyProtocolData() { return Optional.empty(); } + + /** + * The underlying network socket for the connection. + * + * @return the socket + */ + default HelidonSocket serverSocket() { + throw new UnsupportedOperationException("Not supported"); + } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java index ab049a295d1..1b4db4ef65e 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ConnectionHandler.java @@ -242,6 +242,11 @@ public Optional proxyProtocolData() { return Optional.ofNullable(proxyProtocolData); } + @Override + public HelidonSocket serverSocket() { + return helidonSocket; + } + private ServerConnection identifyConnection() { // if just one candidate, take a chance with it if (providerCandidates.size() == 1) { diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java index bae84c1f343..f105a8ab8cd 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,21 @@ public interface SinkProvider { * @param closeRunnable a runnable to call on close * @param type of sink * @return newly created sink + * @deprecated Replaced by {@link #create(SinkProviderContext)} */ - > X create(ServerResponse response, BiConsumer eventConsumer, + @Deprecated(forRemoval = true, since = "4.1.2") + > X create(ServerResponse response, + BiConsumer eventConsumer, Runnable closeRunnable); + + /** + * Creates a sink using this provider. + * + * @param context a context for a sync provider + * @param type of sink + * @return newly created sink + */ + default > X create(SinkProviderContext context) { + throw new UnsupportedOperationException("Not implemented"); + } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProviderContext.java b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProviderContext.java new file mode 100644 index 00000000000..7df4030d486 --- /dev/null +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http/spi/SinkProviderContext.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.webserver.http.spi; + +import io.helidon.webserver.ConnectionContext; +import io.helidon.webserver.http.ServerResponse; + +/** + * A context for {@link io.helidon.webserver.http.spi.SinkProvider}s supplied + * at creation time. + */ +public interface SinkProviderContext { + + /** + * Obtains the server response associated with this context. + * + * @return the server response + */ + ServerResponse serverResponse(); + + /** + * Obtains access to the connection context. + * + * @return the connection context + */ + ConnectionContext connectionContext(); + + /** + * Runnable to execute to close the response. + * + * @return the close runnable + */ + Runnable closeRunnable(); +} diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java index 40e6d09b228..1acac7b2b64 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/http1/Http1ServerResponse.java @@ -45,9 +45,11 @@ import io.helidon.http.media.MediaContext; import io.helidon.webserver.ConnectionContext; import io.helidon.webserver.ServerConnectionException; +import io.helidon.webserver.http.ServerResponse; import io.helidon.webserver.http.ServerResponseBase; import io.helidon.webserver.http.spi.Sink; import io.helidon.webserver.http.spi.SinkProvider; +import io.helidon.webserver.http.spi.SinkProviderContext; /** * An HTTP/1 server response. @@ -261,7 +263,31 @@ public void commit() { public > X sink(GenericType sinkType) { for (SinkProvider p : SINK_PROVIDERS) { if (p.supports(sinkType, request)) { - return (X) p.create(this, this::handleSinkData, this::commit); + try { + return (X) p.create(new SinkProviderContext() { + @Override + public ServerResponse serverResponse() { + return Http1ServerResponse.this; + } + + @Override + public ConnectionContext connectionContext() { + return Http1ServerResponse.this.ctx; + } + + @Override + public Runnable closeRunnable() { + return () -> { + Http1ServerResponse.this.isSent = true; + afterSend(); + request.reset(); + }; + } + }); + } catch (UnsupportedOperationException e) { + // deprecated - will be removed in 5.x + return (X) p.create(this, this::handleSinkData, this::commit); + } } } // Request not acceptable if provider not found @@ -307,7 +333,7 @@ private void handleSinkData(Object data, MediaType mediaType) { } } } catch (IOException e) { - throw new ServerConnectionException("Failed to write sink data", e); + throw new ServerConnectionException("Failed to write response", e); } }