Skip to content

Commit

Permalink
Extract common http logic to server (#31311)
Browse files Browse the repository at this point in the history
This is related to #28898. With the addition of the http nio transport,
we now have two different modules that provide http transports.
Currently most of the http logic lives at the module level. However,
some of this logic can live in server. In particular, some of the
setting of headers, cors, and pipelining. This commit begins this moving
in that direction by introducing lower level abstraction (HttpChannel,
HttpRequest, and HttpResonse) that is implemented by the modules. The
higher level rest request and rest channel work can live entirely in
server.
  • Loading branch information
Tim-Brooks authored Jun 14, 2018
1 parent 6dd81ea commit fcf1e41
Show file tree
Hide file tree
Showing 51 changed files with 2,101 additions and 2,267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,252 +19,58 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.net.InetSocketAddress;

final class Netty4HttpChannel extends AbstractRestChannel {
public class Netty4HttpChannel implements HttpChannel {

private final Netty4HttpServerTransport transport;
private final Channel channel;
private final FullHttpRequest nettyRequest;
private final int sequence;
private final ThreadContext threadContext;
private final HttpHandlingSettings handlingSettings;

/**
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel.
* @param sequence The pipelining sequence number for this request
* @param handlingSettings true if error messages should include stack traces.
* @param threadContext the thread context for the channel
*/
Netty4HttpChannel(Netty4HttpServerTransport transport, Netty4HttpRequest request, int sequence, HttpHandlingSettings handlingSettings,
ThreadContext threadContext) {
super(request, handlingSettings.getDetailedErrorsEnabled());
this.transport = transport;
this.channel = request.getChannel();
this.nettyRequest = request.request();
this.sequence = sequence;
this.threadContext = threadContext;
this.handlingSettings = handlingSettings;
Netty4HttpChannel(Channel channel) {
this.channel = channel;
}

@Override
protected BytesStreamOutput newBytesOutput() {
return new ReleasableBytesStreamOutput(transport.bigArrays);
}

@Override
public void sendResponse(RestResponse response) {
// if the response object was created upstream, then use it;
// otherwise, create a new one
ByteBuf buffer = Netty4Utils.toByteBuf(response.content());
final FullHttpResponse resp;
if (HttpMethod.HEAD.equals(nettyRequest.method())) {
resp = newResponse(Unpooled.EMPTY_BUFFER);
} else {
resp = newResponse(buffer);
}
resp.setStatus(getStatus(response.status()));

Netty4CorsHandler.setCorsResponseHeaders(nettyRequest, resp, transport.getCorsConfig());

String opaque = nettyRequest.headers().get("X-Opaque-Id");
if (opaque != null) {
setHeaderField(resp, "X-Opaque-Id", opaque);
}

// Add all custom headers
addCustomHeaders(resp, response.getHeaders());
addCustomHeaders(resp, threadContext.getResponseHeaders());

BytesReference content = response.content();
boolean releaseContent = content instanceof Releasable;
boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput;
try {
// If our response doesn't specify a content-type header, set one
setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false);
// If our response has no content-length, calculate and set one
setHeaderField(resp, HttpHeaderNames.CONTENT_LENGTH.toString(), String.valueOf(buffer.readableBytes()), false);

addCookies(resp);

final ChannelPromise promise = channel.newPromise();

if (releaseContent) {
promise.addListener(f -> ((Releasable) content).close());
}

if (releaseBytesStreamOutput) {
promise.addListener(f -> bytesOutputOrNull().close());
}

if (isCloseConnection()) {
promise.addListener(ChannelFutureListener.CLOSE);
}

Netty4HttpResponse newResponse = new Netty4HttpResponse(sequence, resp);

channel.writeAndFlush(newResponse, promise);
releaseContent = false;
releaseBytesStreamOutput = false;
} finally {
if (releaseContent) {
((Releasable) content).close();
}
if (releaseBytesStreamOutput) {
bytesOutputOrNull().close();
}
}
}

private void setHeaderField(HttpResponse resp, String headerField, String value) {
setHeaderField(resp, headerField, value, true);
}

private void setHeaderField(HttpResponse resp, String headerField, String value, boolean override) {
if (override || !resp.headers().contains(headerField)) {
resp.headers().add(headerField, value);
}
}

private void addCookies(HttpResponse resp) {
if (handlingSettings.isResetCookies()) {
String cookieString = nettyRequest.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<io.netty.handler.codec.http.cookie.Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
resp.headers().set(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(cookies));
}
}
}
}

private void addCustomHeaders(HttpResponse response, Map<String, List<String>> customHeaders) {
if (customHeaders != null) {
for (Map.Entry<String, List<String>> headerEntry : customHeaders.entrySet()) {
for (String headerValue : headerEntry.getValue()) {
setHeaderField(response, headerEntry.getKey(), headerValue);
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
ChannelPromise writePromise = channel.newPromise();
writePromise.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
Netty4Utils.maybeDie(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
}
});
channel.writeAndFlush(response, writePromise);
}

// Determine if the request protocol version is HTTP 1.0
private boolean isHttp10() {
return nettyRequest.protocolVersion().equals(HttpVersion.HTTP_1_0);
}

// Determine if the request connection should be closed on completion.
private boolean isCloseConnection() {
final boolean http10 = isHttp10();
return HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(nettyRequest.headers().get(HttpHeaderNames.CONNECTION)) ||
(http10 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(nettyRequest.headers().get(HttpHeaderNames.CONNECTION)));
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}

// Create a new {@link HttpResponse} to transmit the response for the netty request.
private FullHttpResponse newResponse(ByteBuf buffer) {
final boolean http10 = isHttp10();
final boolean close = isCloseConnection();
// Build the response object.
final HttpResponseStatus status = HttpResponseStatus.OK; // default to initialize
final FullHttpResponse response;
if (http10) {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, status, buffer);
if (!close) {
response.headers().add(HttpHeaderNames.CONNECTION, "Keep-Alive");
}
} else {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buffer);
}
return response;
@Override
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) channel.remoteAddress();
}

private static Map<RestStatus, HttpResponseStatus> MAP;

static {
EnumMap<RestStatus, HttpResponseStatus> map = new EnumMap<>(RestStatus.class);
map.put(RestStatus.CONTINUE, HttpResponseStatus.CONTINUE);
map.put(RestStatus.SWITCHING_PROTOCOLS, HttpResponseStatus.SWITCHING_PROTOCOLS);
map.put(RestStatus.OK, HttpResponseStatus.OK);
map.put(RestStatus.CREATED, HttpResponseStatus.CREATED);
map.put(RestStatus.ACCEPTED, HttpResponseStatus.ACCEPTED);
map.put(RestStatus.NON_AUTHORITATIVE_INFORMATION, HttpResponseStatus.NON_AUTHORITATIVE_INFORMATION);
map.put(RestStatus.NO_CONTENT, HttpResponseStatus.NO_CONTENT);
map.put(RestStatus.RESET_CONTENT, HttpResponseStatus.RESET_CONTENT);
map.put(RestStatus.PARTIAL_CONTENT, HttpResponseStatus.PARTIAL_CONTENT);
map.put(RestStatus.MULTI_STATUS, HttpResponseStatus.INTERNAL_SERVER_ERROR); // no status for this??
map.put(RestStatus.MULTIPLE_CHOICES, HttpResponseStatus.MULTIPLE_CHOICES);
map.put(RestStatus.MOVED_PERMANENTLY, HttpResponseStatus.MOVED_PERMANENTLY);
map.put(RestStatus.FOUND, HttpResponseStatus.FOUND);
map.put(RestStatus.SEE_OTHER, HttpResponseStatus.SEE_OTHER);
map.put(RestStatus.NOT_MODIFIED, HttpResponseStatus.NOT_MODIFIED);
map.put(RestStatus.USE_PROXY, HttpResponseStatus.USE_PROXY);
map.put(RestStatus.TEMPORARY_REDIRECT, HttpResponseStatus.TEMPORARY_REDIRECT);
map.put(RestStatus.BAD_REQUEST, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.UNAUTHORIZED, HttpResponseStatus.UNAUTHORIZED);
map.put(RestStatus.PAYMENT_REQUIRED, HttpResponseStatus.PAYMENT_REQUIRED);
map.put(RestStatus.FORBIDDEN, HttpResponseStatus.FORBIDDEN);
map.put(RestStatus.NOT_FOUND, HttpResponseStatus.NOT_FOUND);
map.put(RestStatus.METHOD_NOT_ALLOWED, HttpResponseStatus.METHOD_NOT_ALLOWED);
map.put(RestStatus.NOT_ACCEPTABLE, HttpResponseStatus.NOT_ACCEPTABLE);
map.put(RestStatus.PROXY_AUTHENTICATION, HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED);
map.put(RestStatus.REQUEST_TIMEOUT, HttpResponseStatus.REQUEST_TIMEOUT);
map.put(RestStatus.CONFLICT, HttpResponseStatus.CONFLICT);
map.put(RestStatus.GONE, HttpResponseStatus.GONE);
map.put(RestStatus.LENGTH_REQUIRED, HttpResponseStatus.LENGTH_REQUIRED);
map.put(RestStatus.PRECONDITION_FAILED, HttpResponseStatus.PRECONDITION_FAILED);
map.put(RestStatus.REQUEST_ENTITY_TOO_LARGE, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
map.put(RestStatus.REQUEST_URI_TOO_LONG, HttpResponseStatus.REQUEST_URI_TOO_LONG);
map.put(RestStatus.UNSUPPORTED_MEDIA_TYPE, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE);
map.put(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE);
map.put(RestStatus.EXPECTATION_FAILED, HttpResponseStatus.EXPECTATION_FAILED);
map.put(RestStatus.UNPROCESSABLE_ENTITY, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.LOCKED, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.FAILED_DEPENDENCY, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.TOO_MANY_REQUESTS, HttpResponseStatus.TOO_MANY_REQUESTS);
map.put(RestStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.INTERNAL_SERVER_ERROR);
map.put(RestStatus.NOT_IMPLEMENTED, HttpResponseStatus.NOT_IMPLEMENTED);
map.put(RestStatus.BAD_GATEWAY, HttpResponseStatus.BAD_GATEWAY);
map.put(RestStatus.SERVICE_UNAVAILABLE, HttpResponseStatus.SERVICE_UNAVAILABLE);
map.put(RestStatus.GATEWAY_TIMEOUT, HttpResponseStatus.GATEWAY_TIMEOUT);
map.put(RestStatus.HTTP_VERSION_NOT_SUPPORTED, HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED);
MAP = Collections.unmodifiableMap(map);
@Override
public void close() {
channel.close();
}

private static HttpResponseStatus getStatus(RestStatus status) {
return MAP.getOrDefault(status, HttpResponseStatus.INTERNAL_SERVER_ERROR);
public Channel getNettyChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
try {
List<Tuple<Netty4HttpResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
for (Tuple<Netty4HttpResponse, ChannelPromise> readyResponse : readyResponses) {
ctx.write(readyResponse.v1().getResponse(), readyResponse.v2());
ctx.write(readyResponse.v1(), readyResponse.v2());
}
success = true;
} catch (IllegalStateException e) {
Expand Down
Loading

0 comments on commit fcf1e41

Please sign in to comment.