Skip to content

Commit

Permalink
Provides a new implementation for SSE in the webserver. This implemen…
Browse files Browse the repository at this point in the history
…tation does not use the normal output stream to serialize the events to avoid problems with buffering and chunked encoding. Instead, it writes directly to the underlying socket writer and flushes data as needed. As a result, chunked encoding is no longer used for SSE. Several tests have been updated.
  • Loading branch information
spericas committed Aug 29, 2024
1 parent 7f7da14 commit acd1afa
Show file tree
Hide file tree
Showing 14 changed files with 428 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,15 @@ public SocketHttpClient sendChunk(String payload) throws IOException {
return this;
}

/**
* Provides access to underlying socket reader.
*
* @return the reader
*/
public BufferedReader socketReader() {
return socketReader;
}

/**
* Override this to send a specific payload.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,23 +411,31 @@ public int read() {
if (finished) {
return -1;
}
ensureBuffer(512);
if (finished || currentBuffer == null) {
try {
ensureBuffer(512);
if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read();
} catch (DataReader.InsufficientDataAvailableException e) {
return -1;
}
return currentBuffer.read();
}

@Override
public int read(byte[] b, int off, int len) {
if (finished) {
return -1;
}
ensureBuffer(len);
if (finished || currentBuffer == null) {
try {
ensureBuffer(len);
if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read(b, off, len);
} catch (DataReader.InsufficientDataAvailableException e) {
return -1;
}
return currentBuffer.read(b, off, len);
}

private void ensureBuffer(int estimate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public <X extends Source<SseEvent>> void handle(X source, HttpClientResponse res
emit = false;
}
}

source.onClose();
} catch (IOException e) {
source.onError(e);
Expand Down
165 changes: 116 additions & 49 deletions webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,33 @@

package io.helidon.webserver.sse;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.BiConsumer;

import io.helidon.common.GenericType;
import io.helidon.common.buffers.BufferData;
import io.helidon.common.media.type.MediaType;
import io.helidon.common.media.type.MediaTypes;
import io.helidon.http.DateTime;
import io.helidon.http.Header;
import io.helidon.http.HeaderNames;
import io.helidon.http.HttpMediaType;
import io.helidon.http.ServerResponseHeaders;
import io.helidon.http.Status;
import io.helidon.http.WritableHeaders;
import io.helidon.http.media.EntityWriter;
import io.helidon.http.media.MediaContext;
import io.helidon.http.sse.SseEvent;
import io.helidon.webserver.ConnectionContext;
import io.helidon.webserver.http.ServerResponse;
import io.helidon.webserver.http.spi.Sink;
import io.helidon.webserver.http.spi.SinkProviderContext;

import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM;
import static io.helidon.http.HeaderValues.create;

/**
* Implementation of an SSE sink. Emits {@link SseEvent}s.
Expand All @@ -44,71 +54,128 @@ public class SseSink implements Sink<SseEvent> {
*/
public static final GenericType<SseSink> TYPE = GenericType.create(SseSink.class);

private static final Header CACHE_NO_CACHE_ONLY = create(HeaderNames.CACHE_CONTROL, "no-cache");
private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8);
private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8);
private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8);
private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8);
private static final byte[] OK_200 = "HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] DATE = "Date: ".getBytes(StandardCharsets.UTF_8);

private final BiConsumer<Object, MediaType> eventConsumer;
private final Runnable closeRunnable;
private final OutputStream outputStream;

SseSink(ServerResponse serverResponse, BiConsumer<Object, MediaType> eventConsumer, Runnable closeRunnable) {
// Verify response has no status or content type
HttpMediaType ct = serverResponse.headers().contentType().orElse(null);
if (serverResponse.status().code() != Status.OK_200.code()
|| ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) {
throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse");
}
private static final WritableHeaders<?> EMPTY_HEADERS = WritableHeaders.create();

// Ensure content type set for SSE
if (ct == null) {
serverResponse.headers().add(CONTENT_TYPE_EVENT_STREAM);
}
private final ServerResponse response;
private final ConnectionContext ctx;
private final MediaContext mediaContext;
private final Runnable closeRunnable;

this.outputStream = serverResponse.outputStream();
this.eventConsumer = eventConsumer;
this.closeRunnable = closeRunnable;
SseSink(SinkProviderContext context) {
this.response = context.serverResponse();
this.ctx = context.connectionContext();
this.mediaContext = ctx.listenerContext().mediaContext();
this.closeRunnable = context.closeRunnable();
initResponse();
}

@Override
public SseSink emit(SseEvent sseEvent) {
try {
Optional<String> comment = sseEvent.comment();
if (comment.isPresent()) {
outputStream.write(SSE_COMMENT);
outputStream.write(comment.get().getBytes(StandardCharsets.UTF_8));
outputStream.write(SSE_NL);
}
Optional<String> id = sseEvent.id();
if (id.isPresent()) {
outputStream.write(SSE_ID);
outputStream.write(id.get().getBytes(StandardCharsets.UTF_8));
outputStream.write(SSE_NL);
}
Optional<String> name = sseEvent.name();
if (name.isPresent()) {
outputStream.write(SSE_EVENT);
outputStream.write(name.get().getBytes(StandardCharsets.UTF_8));
outputStream.write(SSE_NL);
}
Object data = sseEvent.data();
if (data != SseEvent.NO_DATA) {
outputStream.write(SSE_DATA);
eventConsumer.accept(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN));
outputStream.write(SSE_NL);
}
outputStream.write(SSE_NL);
outputStream.flush();
} catch (IOException e) {
throw new UncheckedIOException(e);
BufferData bufferData = BufferData.growing(512);

Optional<String> comment = sseEvent.comment();
if (comment.isPresent()) {
bufferData.write(SSE_COMMENT);
bufferData.write(comment.get().getBytes(StandardCharsets.UTF_8));
bufferData.write(SSE_NL);
}
Optional<String> id = sseEvent.id();
if (id.isPresent()) {
bufferData.write(SSE_ID);
bufferData.write(id.get().getBytes(StandardCharsets.UTF_8));
bufferData.write(SSE_NL);
}
Optional<String> name = sseEvent.name();
if (name.isPresent()) {
bufferData.write(SSE_EVENT);
bufferData.write(name.get().getBytes(StandardCharsets.UTF_8));
bufferData.write(SSE_NL);
}
Object data = sseEvent.data();
if (data != null) {
bufferData.write(SSE_DATA);
byte[] bytes = serializeData(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN));
bufferData.write(bytes);
bufferData.write(SSE_NL);
}
bufferData.write(SSE_NL);

// write event to the network
ctx.dataWriter().writeNow(bufferData);
return this;
}

@Override
public void close() {
closeRunnable.run();
ctx.serverSocket().close();
}

void initResponse() {
ServerResponseHeaders headers = response.headers();

// verify response has no status or content type
HttpMediaType ct = headers.contentType().orElse(null);
if (response.status().code() != Status.OK_200.code()
|| ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) {
throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse");
}
if (ct == null) {
headers.add(CONTENT_TYPE_EVENT_STREAM);
}
headers.add(CACHE_NO_CACHE_ONLY);

// start writing heading to buffer
BufferData buffer = BufferData.growing(256);
buffer.write(OK_200);

// serialize headers
if (!headers.contains(HeaderNames.DATE)) {
buffer.write(DATE);
byte[] dateBytes = DateTime.http1Bytes();
buffer.write(dateBytes);
}
for (Header header : headers) {
header.writeHttp1Header(buffer);
}

// complete heading
buffer.write('\r'); // "\r\n" - empty line after headers
buffer.write('\n');

// write response heading to the network
ctx.dataWriter().writeNow(buffer);
}

private byte[] serializeData(Object object, MediaType mediaType) {
if (object instanceof byte[] bytes) {
return bytes;
} else if (mediaContext != null) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
if (object instanceof String str && mediaType.equals(MediaTypes.TEXT_PLAIN)) {
EntityWriter<String> writer = mediaContext.writer(GenericType.STRING, EMPTY_HEADERS, EMPTY_HEADERS);
writer.write(GenericType.STRING, str, baos, EMPTY_HEADERS, EMPTY_HEADERS);
} else {
GenericType<Object> type = GenericType.create(object);
WritableHeaders<?> resHeaders = WritableHeaders.create();
resHeaders.set(HeaderNames.CONTENT_TYPE, mediaType.text());
EntityWriter<Object> writer = mediaContext.writer(type, EMPTY_HEADERS, resHeaders);
writer.write(type, object, baos, EMPTY_HEADERS, resHeaders);
}
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
throw new IllegalStateException("Unable to serialize SSE event without a media context");
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import io.helidon.webserver.http.ServerResponse;
import io.helidon.webserver.http.spi.Sink;
import io.helidon.webserver.http.spi.SinkProvider;
import io.helidon.webserver.http.spi.SinkProviderContext;

/**
* Sink provider for SSE type.
Expand All @@ -37,10 +38,18 @@ public boolean supports(GenericType<? extends Sink<?>> type, ServerRequest reque
return SseSink.TYPE.equals(type) && request.headers().isAccepted(MediaTypes.TEXT_EVENT_STREAM);
}


@Override
@SuppressWarnings("unchecked")
public <X extends Sink<SseEvent>> X create(ServerResponse response, BiConsumer<Object, MediaType> eventConsumer,
Runnable closeRunnable) {
return (X) new SseSink(response, eventConsumer, closeRunnable);
public <X extends Sink<SseEvent>> X create(SinkProviderContext context) {
return (X) new SseSink(context);
}

@Override
public <X extends Sink<SseEvent>> X create(ServerResponse response,
BiConsumer<Object, MediaType> eventConsumer,
Runnable closeRunnable) {
throw new UnsupportedOperationException("Deprecated, use other create method in class");
}

}
3 changes: 2 additions & 1 deletion webserver/sse/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@

requires static io.helidon.common.features.api;

requires io.helidon.common.socket;
requires transitive io.helidon.common;
requires transitive io.helidon.http.sse;
requires transitive io.helidon.webserver;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -138,4 +138,9 @@ public DirectHandlers directHandlers() {
public ListenerConfig config() {
return listenerConfiguration;
}

@Override
public HelidonSocket serverSocket() {
return serverSocket;
}
}
Loading

0 comments on commit acd1afa

Please sign in to comment.