Skip to content

Commit

Permalink
Mark response as complete before WebSocket upgrade
Browse files Browse the repository at this point in the history
Prior to this commit, some WebSocket `RequestUpgradeStrategy` reactive
implementations would prevent the application from writing HTTP headers
and cookies to the response.

For Reactor Netty and Undertow, handling the upgrade and starting the
WebSocket communication marks the response status and headers as sent
and the application cannot update HTTP response headers after that.

This commit ensures that the `RequestUpgradeStrategy` implementations
mark the responses as "complete", so that headers are written before we
delegate to the server implementation.

Fixes gh-24475
  • Loading branch information
bclozel committed Feb 13, 2020
1 parent 4cbc61a commit 13f23dc
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
HttpServerResponse reactorResponse = getNativeResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();

return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength, this.handlePing,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
URI uri = exchange.getRequest().getURI();
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
});
return response.setComplete()
.then(Mono.defer(() -> reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength, this.handlePing,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
URI uri = exchange.getRequest().getURI();
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
})));
}

private static HttpServerResponse getNativeResponse(ServerHttpResponse response) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,10 +43,11 @@
import org.springframework.web.server.ServerWebExchange;

/**
* A {@link RequestUpgradeStrategy} for use with Undertow.
*
* A {@link RequestUpgradeStrategy} for use with Undertow.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @author Brian Clozel
* @since 5.0
*/
public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy {
Expand All @@ -63,16 +64,12 @@ public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,

HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();

try {
DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);
new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
}
catch (Exception ex) {
return Mono.error(ex);
}

return Mono.empty();
return exchange.getResponse().setComplete()
.then(Mono.fromCallable(() -> {
DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);
new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
return null;
}));
}

private static HttpServerExchange getNativeRequest(ServerHttpRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,9 +33,11 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseCookie;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.server.WebFilter;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -45,6 +47,7 @@
*
* @author Rossen Stoyanchev
* @author Sam Brannen
* @author Brian Clozel
*/
class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {

Expand Down Expand Up @@ -91,6 +94,7 @@ void subProtocol(WebSocketClient client, HttpServer server, Class<?> serverConfi
public List<String> getSubProtocols() {
return Collections.singletonList(protocol);
}

@Override
public Mono<Void> handle(WebSocketSession session) {
infoRef.set(session.getHandshakeInfo());
Expand Down Expand Up @@ -138,12 +142,31 @@ void sessionClosing(WebSocketClient client, HttpServer server, Class<?> serverCo
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doFinally(signalType ->
logger.debug("Completed with: " + signalType)
logger.debug("Completed with: " + signalType)
);
})
.block(TIMEOUT);
}

@ParameterizedWebSocketTest
void cookie(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception {
startServer(client, server, serverConfigClass);

MonoProcessor<Object> output = MonoProcessor.create();
AtomicReference<String> cookie = new AtomicReference<>();
this.client.execute(getUrl("/cookie"),
session -> {
cookie.set(session.getHandshakeInfo().getHeaders().getFirst("Set-Cookie"));
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then();
})
.block(TIMEOUT);
assertThat(output.block(TIMEOUT)).isEqualTo("cookie");
assertThat(cookie.get()).isEqualTo("project=spring");
}


@Configuration
static class WebConfig {
Expand All @@ -155,8 +178,19 @@ public HandlerMapping handlerMapping() {
map.put("/sub-protocol", new SubProtocolWebSocketHandler());
map.put("/custom-header", new CustomHeaderHandler());
map.put("/close", new SessionClosingHandler());
map.put("/cookie", new CookieHandler());
return new SimpleUrlHandlerMapping(map);
}

@Bean
public WebFilter cookieWebFilter() {
return (exchange, chain) -> {
if (exchange.getRequest().getPath().value().startsWith("/cookie")) {
exchange.getResponse().addCookie(ResponseCookie.from("project", "spring").build());
}
return chain.filter(exchange);
};
}
}


Expand Down Expand Up @@ -209,4 +243,13 @@ public Mono<Void> handle(WebSocketSession session) {
}
}

private static class CookieHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
WebSocketMessage message = session.textMessage("cookie");
return session.send(Mono.just(message));
}
}

}

0 comments on commit 13f23dc

Please sign in to comment.