Skip to content

Commit

Permalink
Merge pull request #20557 from michalszynkiewicz/reactive-messaging-h…
Browse files Browse the repository at this point in the history
…ttp-better-error

Improved error handling in Reactive Messaging HTTP
  • Loading branch information
michalszynkiewicz authored Oct 7, 2021
2 parents 9a2129e + e29bcde commit 3271bbd
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void handle(RoutingContext event) {
if (bundle != null) {
MultiEmitter<? super MessageType> emitter = bundle.emitter;
StrictQueueSizeGuard guard = bundle.guard;
handleRequest(event, emitter, guard);
handleRequest(event, emitter, guard, bundle.path);
} else {
event.response().setStatusCode(404).end();
}
Expand All @@ -41,6 +41,7 @@ private void addProcessor(ConfigType streamConfig) {
.<MessageType> emitter(bundle::setEmitter, BackPressureStrategy.BUFFER)
.onItem().invoke(guard::dequeue);
bundle.setProcessor(processor);
bundle.setPath(streamConfig.path);

Bundle<MessageType> previousProcessor = processors.put(key(streamConfig), bundle);
if (previousProcessor != null) {
Expand All @@ -49,7 +50,7 @@ private void addProcessor(ConfigType streamConfig) {
}

protected abstract void handleRequest(RoutingContext event, MultiEmitter<? super MessageType> emitter,
StrictQueueSizeGuard guard);
StrictQueueSizeGuard guard, String path);

protected abstract String description(ConfigType streamConfig);

Expand All @@ -63,6 +64,7 @@ protected class Bundle<MessageType> {
private final StrictQueueSizeGuard guard;
private Multi<MessageType> processor; // effectively final
private MultiEmitter<? super MessageType> emitter; // effectively final
private String path;

private Bundle(StrictQueueSizeGuard guard) {
this.guard = guard;
Expand All @@ -79,5 +81,9 @@ public void setEmitter(MultiEmitter<? super MessageType> emitter) {
public Multi<MessageType> getProcessor() {
return processor;
}

public void setPath(String path) {
this.path = path;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected String key(HttpStreamConfig streamConfig) {

@Override
protected String key(RoutingContext context) {
return key(context.normalisedPath(), context.request().method());
return key(context.normalizedPath(), context.request().method());
}

@Override
Expand All @@ -52,8 +52,11 @@ protected String description(HttpStreamConfig streamConfig) {

@Override
protected void handleRequest(RoutingContext event, MultiEmitter<? super HttpMessage<?>> emitter,
StrictQueueSizeGuard guard) {
if (guard.prepareToEmit()) {
StrictQueueSizeGuard guard, String path) {
if (emitter == null) {
onUnexpectedError(event, null,
"No consumer subscribed for messages sent to Reactive Messaging HTTP endpoint on path: " + path);
} else if (guard.prepareToEmit()) {
try {
HttpMessage<Buffer> message = new HttpMessage<>(event.getBody(), new IncomingHttpMetadata(event.request()),
() -> event.response().setStatusCode(202).end(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ReactiveWebSocketHandlerBean extends ReactiveHandlerBeanBase<WebSoc

@Override
protected void handleRequest(RoutingContext event, MultiEmitter<? super WebSocketMessage<?>> emitter,
StrictQueueSizeGuard guard) {
StrictQueueSizeGuard guard, String path) {
event.request().toWebSocket(
webSocket -> {
if (webSocket.failed()) {
Expand All @@ -37,7 +37,11 @@ protected void handleRequest(RoutingContext event, MultiEmitter<? super WebSocke
ServerWebSocket serverWebSocket = webSocket.result();
serverWebSocket.handler(
b -> {
if (guard.prepareToEmit()) {
if (emitter == null) {
onUnexpectedError(serverWebSocket, null,
"No consumer subscribed for messages sent to " +
"Reactive Messaging WebSocket endpoint on path: " + path);
} else if (guard.prepareToEmit()) {
try {
emitter.emit(new WebSocketMessage<>(b,
() -> serverWebSocket.write(Buffer.buffer("ACK")),
Expand Down Expand Up @@ -67,7 +71,7 @@ protected String key(WebSocketStreamConfig config) {

@Override
protected String key(RoutingContext context) {
return context.normalisedPath();
return context.normalizedPath();
}

@Override
Expand All @@ -78,7 +82,7 @@ protected Collection<WebSocketStreamConfig> configs() {
private void onUnexpectedError(ServerWebSocket serverWebSocket, Throwable error, String message) {
log.error(message, error);
// TODO some error message for the client? exception mapper would be best...
serverWebSocket.close((short) 3500, message);
serverWebSocket.close((short) 3500, "Unexpected error while processing the message");
}

Multi<WebSocketMessage<?>> getProcessor(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

public class HttpStreamConfig extends StreamConfigBase {
public final HttpMethod method;
public final String path;

public HttpStreamConfig(String path, String method, String name, int bufferSize) {
super(bufferSize);
this.path = path;
super(bufferSize, path);
this.method = toHttpMethod(method, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

public class StreamConfigBase {
public final int bufferSize;
public final String path;

public StreamConfigBase(int bufferSize) {
public StreamConfigBase(int bufferSize, String path) {
this.path = path;
this.bufferSize = bufferSize;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package io.quarkus.reactivemessaging.http.runtime.config;

public class WebSocketStreamConfig extends StreamConfigBase {
public final String path;

public WebSocketStreamConfig(String path, int bufferSize) {
super(bufferSize);
this.path = path;
super(bufferSize, path);
}

public String path() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private static RestClientsConfig getConfigRoot() {
InstanceHandle<RestClientsConfig> configHandle = Arc.container()
.instance(RestClientsConfig.class);
if (!configHandle.isAvailable()) {
throw new IllegalStateException("Unable to find the RestClientConfigRootProvider");
throw new IllegalStateException("Unable to find the RestClientConfigs");
}
return configHandle.get();
}
Expand Down

0 comments on commit 3271bbd

Please sign in to comment.