Skip to content

Commit

Permalink
* http_client: preliminary support for sse
Browse files Browse the repository at this point in the history
Signed-off-by: neo <[email protected]>
  • Loading branch information
neowu committed Jul 31, 2024
1 parent 90b20f4 commit 5fb4fbc
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 30 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
## Change log

### 9.1.1 (7/11/2024 - )
### 9.1.1-b1 (7/11/2024 - )

* ws/sse: updated max process time
* kafka: update to 3.8.0
* http_client: preliminary support for sse
> example usage:
```
var request = new HTTPRequest(HTTPMethod.GET, "https://localhost:8443/sse");
try (EventSource source = client.sse(request)) {
for (EventSource.Event event : source) {
System.out.println(event.id() + " " + event.data());
}
}
```

### 9.1.0 (6/12/2024 - 7/9/2024)

Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ apply(plugin = "project")

subprojects {
group = "core.framework"
version = "9.1.1"
version = "9.1.1-b1"
}

val elasticVersion = "8.14.0"
Expand Down
105 changes: 105 additions & 0 deletions core-ng/src/main/java/core/framework/http/EventSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package core.framework.http;

import core.framework.log.ActionLogContext;
import core.framework.util.StopWatch;
import okhttp3.ResponseBody;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;

/**
* @author neo
*/
public final class EventSource implements AutoCloseable, Iterable<EventSource.Event>, Iterator<EventSource.Event> {
private static final Logger LOGGER = LoggerFactory.getLogger(EventSource.class);
public final int statusCode;
public final Map<String, String> headers; // headers key is case insensitive

private final ResponseBody body;
private final int requestBodyLength;
private int responseBodyLength;
private long elapsed;

private String lastId;
private Event nextEvent;

public EventSource(int statusCode, Map<String, String> headers, ResponseBody body, int requestBodyLength, long elapsed) {
this.statusCode = statusCode;
this.headers = headers;
this.body = body;
this.requestBodyLength = requestBodyLength;
this.elapsed = elapsed;
}

@Override
public void close() {
ActionLogContext.track("sse", elapsed, responseBodyLength, requestBodyLength);
body.close();
}

@Override
public boolean hasNext() {
if (nextEvent != null) return true;
nextEvent = parseResponse(body.source());
return nextEvent != null;
}

@Override
public Event next() {
if (nextEvent != null || hasNext()) {
var event = nextEvent;
nextEvent = null;
return event;
} else {
throw new NoSuchElementException();
}
}

@Override
public Iterator<Event> iterator() {
return this;
}

private Event parseResponse(BufferedSource source) {
var watch = new StopWatch();
try {
while (true) {
String line = source.readUtf8Line();
if (line == null) return null;

if (line.isEmpty()) {
lastId = null;
continue;
}
LOGGER.debug("[sse] line={}", line);
responseBodyLength += line.length();
int index = line.indexOf(": ");
if (index == -1) continue;

String field = line.substring(0, index);
switch (field) {
case "id":
lastId = line.substring(index + 2);
break;
case "data":
String id = lastId;
lastId = null;
return new Event(id, line.substring(index + 2));
default: // ignore "event", "retry" and other fields
}
}
} catch (IOException e) {
throw new HTTPClientException("failed to read sse response, error=" + e.getMessage(), "HTTP_REQUEST_FAILED", e);
} finally {
elapsed += watch.elapsed();
}
}

public record Event(String id, String data) {
}
}
2 changes: 2 additions & 0 deletions core-ng/src/main/java/core/framework/http/HTTPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ static HTTPClientBuilder builder() {
}

HTTPResponse execute(HTTPRequest request);

EventSource sse(HTTPRequest request);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core.framework.internal.http;

import core.framework.http.ContentType;
import core.framework.http.EventSource;
import core.framework.http.HTTPClient;
import core.framework.http.HTTPClientException;
import core.framework.http.HTTPHeaders;
Expand Down Expand Up @@ -70,21 +71,34 @@ public HTTPResponse execute(HTTPRequest request) {
}
}

long slowOperationThresholdInNanos(HTTPRequest request) {
if (request.slowOperationThreshold != null) return request.slowOperationThreshold.toNanos();
return slowOperationThresholdInNanos;
@Override
public EventSource sse(HTTPRequest request) {
var watch = new StopWatch();
request.headers.put(HTTPHeaders.ACCEPT, "text/event-stream");
int requestBodyLength = request.body == null ? 0 : request.body.length;
Request httpRequest = httpRequest(request);
try {
Response httpResponse = client.newCall(httpRequest).execute();
int statusCode = httpResponse.code();
logger.debug("[response] status={}", statusCode);
Map<String, String> headers = headers(httpResponse);
return new EventSource(statusCode, headers, httpResponse.body(), requestBodyLength, watch.elapsed());
} catch (IOException e) {
throw new HTTPClientException(Strings.format("http request failed, uri={}, error={}", request.uri, e.getMessage()), "HTTP_REQUEST_FAILED", e);
} finally {
long elapsed = watch.elapsed();
logger.debug("sse, elapsed={}", elapsed);
if (elapsed > slowOperationThresholdInNanos(request)) {
logger.warn(errorCode("SLOW_HTTP"), "slow http operation, method={}, uri={}, elapsed={}", request.method, request.uri, Duration.ofNanos(elapsed));
}
}
}

HTTPResponse response(Response httpResponse) throws IOException {
int statusCode = httpResponse.code();
logger.debug("[response] status={}", statusCode);

Map<String, String> headers = new TreeMap<>(CASE_INSENSITIVE_ORDER);
Headers httpHeaders = httpResponse.headers();
for (int i = 0; i < httpHeaders.size(); i++) {
headers.put(httpHeaders.name(i), httpHeaders.value(i));
}
logger.debug("[response] headers={}", new FieldMapLogParam(headers));
Map<String, String> headers = headers(httpResponse);

byte[] body;
if (statusCode == 204) {
Expand Down Expand Up @@ -143,6 +157,21 @@ Request httpRequest(HTTPRequest request) {
return builder.build();
}

private Map<String, String> headers(Response httpResponse) {
Map<String, String> headers = new TreeMap<>(CASE_INSENSITIVE_ORDER);
Headers httpHeaders = httpResponse.headers();
for (int i = 0; i < httpHeaders.size(); i++) {
headers.put(httpHeaders.name(i), httpHeaders.value(i));
}
logger.debug("[response] headers={}", new FieldMapLogParam(headers));
return headers;
}

long slowOperationThresholdInNanos(HTTPRequest request) {
if (request.slowOperationThreshold != null) return request.slowOperationThreshold.toNanos();
return slowOperationThresholdInNanos;
}

@Nullable
MediaType mediaType(ContentType contentType) {
if (contentType == null) return null; // generally body is always set with valid content type, but in theory contentType=null is considered legitimate, so here to support such case
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ String build(String id, T event) {

String build(String id, String data) {
var builder = new StringBuilder(data.length() + 7 + (id == null ? 0 : id.length() + 4));
if (id != null) builder.append("id:").append(id).append('\n');
builder.append("data:").append(data).append("\n\n");
if (id != null) builder.append("id: ").append(id).append('\n');
builder.append("data: ").append(data).append("\n\n");

return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void handle(HttpServerExchange exchange, StreamSinkChannel sink) {
support.context.add(channel);
exchange.addExchangeCompleteListener(new ServerSentEventCloseHandler<>(logManager, channel, support.context));

channel.send("retry:15000\n\n"); // set browser retry to 15s
channel.send("retry: 15000\n\n"); // set browser retry to 15s

request.session = ReadOnlySession.of(sessionManager.load(request, actionLog));
String lastEventId = exchange.getRequestHeaders().getLast(LAST_EVENT_ID);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core.framework.internal.http;

import core.framework.http.ContentType;
import core.framework.http.EventSource;
import core.framework.http.HTTPClientException;
import core.framework.http.HTTPHeaders;
import core.framework.http.HTTPMethod;
Expand Down Expand Up @@ -61,17 +62,17 @@ void httpRequest() {
@Test
void httpRequestWithInvalidURL() {
assertThatThrownBy(() -> httpClient.httpRequest(new HTTPRequest(HTTPMethod.HEAD, "//%%")))
.isInstanceOf(HTTPClientException.class)
.hasMessageContaining("uri is invalid");
.isInstanceOf(HTTPClientException.class)
.hasMessageContaining("uri is invalid");
}

@Test
void response() throws IOException {
Response httpResponse = new Response.Builder().request(new Request.Builder().url("http://localhost/uri").build())
.protocol(Protocol.HTTP_1_1).code(200).message("")
.header("content-type", "text/html")
.body(ResponseBody.create(Strings.bytes("<html/>"), MediaType.get("text/html")))
.build();
.protocol(Protocol.HTTP_1_1).code(200).message("")
.header("content-type", "text/html")
.body(ResponseBody.create(Strings.bytes("<html/>"), MediaType.get("text/html")))
.build();

HTTPResponse response = httpClient.response(httpResponse);
assertThat(response.statusCode).isEqualTo(200);
Expand All @@ -83,8 +84,8 @@ void response() throws IOException {
@Test
void responseWith204() throws IOException {
Response httpResponse = new Response.Builder().request(new Request.Builder().url("http://localhost/uri").build())
.protocol(Protocol.HTTP_2).code(204).message("")
.build();
.protocol(Protocol.HTTP_2).code(204).message("")
.build();

HTTPResponse response = httpClient.response(httpResponse);
assertThat(response.statusCode).isEqualTo(204);
Expand All @@ -96,10 +97,10 @@ void mediaType() {
assertThat(httpClient.mediaType(null)).isNull();

assertThat(httpClient.mediaType(ContentType.APPLICATION_JSON))
.isSameAs(httpClient.mediaType(ContentType.APPLICATION_JSON));
.isSameAs(httpClient.mediaType(ContentType.APPLICATION_JSON));

assertThat(httpClient.mediaType(ContentType.APPLICATION_FORM_URLENCODED).toString())
.isEqualTo(ContentType.APPLICATION_FORM_URLENCODED.mediaType);
.isEqualTo(ContentType.APPLICATION_FORM_URLENCODED.mediaType);
}

@Test
Expand All @@ -114,15 +115,40 @@ void slowOperationThresholdInNanos() {
@Test
void execute() throws IOException {
Response httpResponse = new Response.Builder().request(new Request.Builder().url("http://localhost/uri").build())
.protocol(Protocol.HTTP_1_1).code(200).message("OK")
.header("content-type", "text/html")
.body(ResponseBody.create(Strings.bytes("<html/>"), MediaType.get("text/html")))
.build();
.protocol(Protocol.HTTP_1_1).code(200).message("OK")
.header("content-type", "text/html")
.body(ResponseBody.create(Strings.bytes("<html/>"), MediaType.get("text/html")))
.build();
Call call = mock(Call.class);
when(okHttpClient.newCall(any())).thenReturn(call);
when(call.execute()).thenReturn(httpResponse);

HTTPResponse response = httpClient.execute(new HTTPRequest(HTTPMethod.GET, "http://localhost/uri"));
assertThat(response.statusCode).isEqualTo(200);
}

@Test
void sse() throws IOException {
Response httpResponse = new Response.Builder().request(new Request.Builder().url("http://localhost/sse").build())
.protocol(Protocol.HTTP_1_1).code(200).message("OK")
.header("content-type", "text/event-stream")
.body(ResponseBody.create(Strings.bytes("""
retry: 10000
id: 1
data: test
"""), MediaType.get("text/event-stream")))
.build();
Call call = mock(Call.class);
when(okHttpClient.newCall(any())).thenReturn(call);
when(call.execute()).thenReturn(httpResponse);

try (EventSource response = httpClient.sse(new HTTPRequest(HTTPMethod.GET, "http://localhost/uri"))) {
assertThat(response.statusCode).isEqualTo(200);
for (EventSource.Event event : response) {
assertThat(event.id()).isEqualTo("1");
assertThat(event.data()).isEqualTo("test");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ void createServerSentEventBuilder() {
@Test
void build() {
assertThat(builder.build("001", "message"))
.asString().isEqualTo("id:001\ndata:message\n\n");
.asString().isEqualTo("id: 001\ndata: message\n\n");

assertThat(builder.build(null, "message"))
.asString().isEqualTo("data:message\n\n");
.asString().isEqualTo("data: message\n\n");
}
}

0 comments on commit 5fb4fbc

Please sign in to comment.