Skip to content

Commit

Permalink
Support for HTTP/2 (server-side) (#3847)
Browse files Browse the repository at this point in the history
* Support for HTTP/2 (server-side)

Signed-off-by: Andriy Redko <[email protected]>

* Addressing code review comments

Signed-off-by: Andriy Redko <[email protected]>

* Added HTTP/1.1 channel configuration

Signed-off-by: Andriy Redko <[email protected]>

* Addressing code review comments

Signed-off-by: Andriy Redko <[email protected]>

* Update pul request URL in CHANGELOG.md

Signed-off-by: Andriy Redko <[email protected]>

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Aug 31, 2022
1 parent 4f65ef5 commit d72861f
Show file tree
Hide file tree
Showing 12 changed files with 428 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
- Label configuration for dependabot PRs ([#4348](https://github.com/opensearch-project/OpenSearch/pull/4348))
- Support for HTTP/2 (server-side) ([#3847](https://github.com/opensearch-project/OpenSearch/pull/3847))

### Changed
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
Expand Down
1 change: 1 addition & 0 deletions modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies {
api "io.netty:netty-buffer:${versions.netty}"
api "io.netty:netty-codec:${versions.netty}"
api "io.netty:netty-codec-http:${versions.netty}"
api "io.netty:netty-codec-http2:${versions.netty}"
api "io.netty:netty-common:${versions.netty}"
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0eeffab0cd5efb699d5e4ab9b694d32fef6694b3
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.netty4;

import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.ReferenceCounted;
import org.opensearch.OpenSearchNetty4IntegTestCase;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.util.Collection;
import java.util.Locale;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;

@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class Netty4Http2IT extends OpenSearchNetty4IntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

public void testThatNettyHttpServerSupportsHttp2() throws Exception {
String[] requests = new String[] { "/", "/_nodes/stats", "/", "/_cluster/state", "/" };

HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
try {
assertThat(responses, hasSize(5));

Collection<String> opaqueIds = Netty4HttpClient.returnOpaqueIds(responses);
assertOpaqueIdsInAnyOrder(opaqueIds);
} finally {
responses.forEach(ReferenceCounted::release);
}
}
}

private void assertOpaqueIdsInAnyOrder(Collection<String> opaqueIds) {
// check if opaque ids are present in any order, since for HTTP/2 we use streaming (no head of line blocking)
// and responses may come back at any order
int i = 0;
String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be in any order, got [%s]", opaqueIds);
assertThat(msg, opaqueIds, containsInAnyOrder(IntStream.range(0, 5).mapToObj(Integer::toString).toArray()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testLimitsInFlightRequests() throws Exception {
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());

try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
Collection<FullHttpResponse> singleResponse = nettyHttpClient.post(transportAddress.address(), requests.subList(0, 1));
try {
assertThat(singleResponse, hasSize(1));
Expand Down Expand Up @@ -130,7 +130,7 @@ public void testDoesNotLimitExcludedRequests() throws Exception {
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());

try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
Collection<FullHttpResponse> responses = nettyHttpClient.put(transportAddress.address(), requestUris);
try {
assertThat(responses, hasSize(requestUris.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testThatNettyHttpServerSupportsPipelining() throws Exception {
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
try {
assertThat(responses, hasSize(5));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
package org.opensearch.http.netty4;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;

import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.concurrent.CompletableContext;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpResponse;
Expand All @@ -45,9 +48,15 @@ public class Netty4HttpChannel implements HttpChannel {

private final Channel channel;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private final ChannelPipeline inboundPipeline;

Netty4HttpChannel(Channel channel) {
this(channel, null);
}

Netty4HttpChannel(Channel channel, ChannelPipeline inboundPipeline) {
this.channel = channel;
this.inboundPipeline = inboundPipeline;
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
}

Expand Down Expand Up @@ -81,6 +90,10 @@ public void close() {
channel.close();
}

public @Nullable ChannelPipeline inboundPipeline() {
return inboundPipeline;
}

public Channel getNettyChannel() {
return channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,36 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory;
import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -335,38 +353,152 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
this.responseCreator = new Netty4HttpResponseCreator();
}

public ChannelHandler getRequestHandler() {
return requestHandler;
}

@Override
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));

configurePipeline(ch);
transport.serverAcceptedChannel(nettyHttpChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}

protected void configurePipeline(Channel ch) {
final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() {
@Override
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(
Http2FrameCodecBuilder.forServer().build(),
new Http2MultiplexHandler(createHttp2ChannelInitializer(ch.pipeline()))
);
} else {
return null;
}
}
};

final HttpServerCodec sourceCodec = new HttpServerCodec(
handlingSettings.getMaxInitialLineLength(),
handlingSettings.getMaxHeaderSize(),
handlingSettings.getMaxChunkSize()
);

final HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);
final CleartextHttp2ServerUpgradeHandler cleartextUpgradeHandler = new CleartextHttp2ServerUpgradeHandler(
sourceCodec,
upgradeHandler,
createHttp2ChannelInitializerPriorKnowledge()
);

ch.pipeline().addLast(cleartextUpgradeHandler).addLast(new SimpleChannelInboundHandler<HttpMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);

// If this handler is hit then no upgrade has been attempted and the client is just talking HTTP
final ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), "handler", getRequestHandler());
pipeline.replace(this, "aggregator", aggregator);

ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
if (handlingSettings.isCompression()) {
ch.pipeline()
.addAfter("aggregator", "encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addBefore("handler", "request_creator", requestCreator);
ch.pipeline().addBefore("handler", "response_creator", responseCreator);
ch.pipeline()
.addBefore("handler", "pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));

ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
});
}

protected void configureDefaultHttpPipeline(ChannelPipeline pipeline) {
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
handlingSettings.getMaxHeaderSize(),
handlingSettings.getMaxChunkSize()
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("decoder", decoder);
pipeline.addLast("decoder_compress", new HttpContentDecompressor());
pipeline.addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline().addLast("aggregator", aggregator);
pipeline.addLast("aggregator", aggregator);
if (handlingSettings.isCompression()) {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
pipeline.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addLast("request_creator", requestCreator);
ch.pipeline().addLast("response_creator", responseCreator);
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
pipeline.addLast("request_creator", requestCreator);
pipeline.addLast("response_creator", responseCreator);
pipeline.addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
pipeline.addLast("handler", requestHandler);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
protected void configureDefaultHttp2Pipeline(ChannelPipeline pipeline) {
pipeline.addLast(Http2FrameCodecBuilder.forServer().build())
.addLast(new Http2MultiplexHandler(createHttp2ChannelInitializer(pipeline)));
}

private ChannelInitializer<Channel> createHttp2ChannelInitializerPriorKnowledge() {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel childChannel) throws Exception {
configureDefaultHttp2Pipeline(childChannel.pipeline());
}
};
}

/**
* Http2MultiplexHandler creates new pipeline, we are preserving the old one in case some handlers need to be
* access (like for example opensearch-security plugin which accesses SSL handlers).
*/
private ChannelInitializer<Channel> createHttp2ChannelInitializer(ChannelPipeline inboundPipeline) {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel childChannel) throws Exception {
final Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(childChannel, inboundPipeline);
childChannel.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);

final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);

childChannel.pipeline()
.addLast(new LoggingHandler(LogLevel.DEBUG))
.addLast(new Http2StreamFrameToHttpObjectCodec(true))
.addLast("byte_buf_sizer", byteBufSizer)
.addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS))
.addLast("decoder_decompress", new HttpContentDecompressor());

if (handlingSettings.isCompression()) {
childChannel.pipeline()
.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}

childChannel.pipeline()
.addLast("aggregator", aggregator)
.addLast("request_creator", requestCreator)
.addLast("response_creator", responseCreator)
.addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents))
.addLast("handler", getRequestHandler());
}
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
httpServerTransport.start();
final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());

try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
final Collection<FullHttpResponse> responses = nettyHttpClient.get(
transportAddress.address(),
"/_cluster/settings?pretty=%"
Expand Down
Loading

0 comments on commit d72861f

Please sign in to comment.