Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for HTTP/2 (server-side) #3847

Merged
merged 5 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
- Label configuration for dependabot PRs ([#4348](https://github.com/opensearch-project/OpenSearch/pull/4348))
- Support for HTTP/2 (server-side) ([#3847](https://github.com/opensearch-project/OpenSearch/pull/3847))

### Changed
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
Expand Down
1 change: 1 addition & 0 deletions modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies {
api "io.netty:netty-buffer:${versions.netty}"
api "io.netty:netty-codec:${versions.netty}"
api "io.netty:netty-codec-http:${versions.netty}"
api "io.netty:netty-codec-http2:${versions.netty}"
api "io.netty:netty-common:${versions.netty}"
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0eeffab0cd5efb699d5e4ab9b694d32fef6694b3
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.netty4;

import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.ReferenceCounted;
import org.opensearch.OpenSearchNetty4IntegTestCase;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.util.Collection;
import java.util.Locale;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;

@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class Netty4Http2IT extends OpenSearchNetty4IntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

public void testThatNettyHttpServerSupportsHttp2() throws Exception {
String[] requests = new String[] { "/", "/_nodes/stats", "/", "/_cluster/state", "/" };

HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
try {
assertThat(responses, hasSize(5));

Collection<String> opaqueIds = Netty4HttpClient.returnOpaqueIds(responses);
assertOpaqueIdsInAnyOrder(opaqueIds);
} finally {
responses.forEach(ReferenceCounted::release);
}
}
}

private void assertOpaqueIdsInAnyOrder(Collection<String> opaqueIds) {
// check if opaque ids are present in any order, since for HTTP/2 we use streaming (no head of line blocking)
// and responses may come back at any order
int i = 0;
String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be in any order, got [%s]", opaqueIds);
assertThat(msg, opaqueIds, containsInAnyOrder(IntStream.range(0, 5).mapToObj(Integer::toString).toArray()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testLimitsInFlightRequests() throws Exception {
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());

try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
Collection<FullHttpResponse> singleResponse = nettyHttpClient.post(transportAddress.address(), requests.subList(0, 1));
try {
assertThat(singleResponse, hasSize(1));
Expand Down Expand Up @@ -130,7 +130,7 @@ public void testDoesNotLimitExcludedRequests() throws Exception {
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());

try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
Collection<FullHttpResponse> responses = nettyHttpClient.put(transportAddress.address(), requestUris);
try {
assertThat(responses, hasSize(requestUris.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testThatNettyHttpServerSupportsPipelining() throws Exception {
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
try {
assertThat(responses, hasSize(5));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
package org.opensearch.http.netty4;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;

import org.opensearch.action.ActionListener;
import org.opensearch.common.Nullable;
import org.opensearch.common.concurrent.CompletableContext;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpResponse;
Expand All @@ -45,9 +48,15 @@ public class Netty4HttpChannel implements HttpChannel {

private final Channel channel;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private final ChannelPipeline inboundPipeline;

Netty4HttpChannel(Channel channel) {
this(channel, null);
}

Netty4HttpChannel(Channel channel, ChannelPipeline inboundPipeline) {
this.channel = channel;
this.inboundPipeline = inboundPipeline;
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
}

Expand Down Expand Up @@ -81,6 +90,10 @@ public void close() {
channel.close();
}

public @Nullable ChannelPipeline inboundPipeline() {
return inboundPipeline;
}

public Channel getNettyChannel() {
return channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,36 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory;
import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -335,38 +353,152 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
this.responseCreator = new Netty4HttpResponseCreator();
}

public ChannelHandler getRequestHandler() {
return requestHandler;
}

@Override
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));

configurePipeline(ch);
transport.serverAcceptedChannel(nettyHttpChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}

protected void configurePipeline(Channel ch) {
final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() {
@Override
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(
Http2FrameCodecBuilder.forServer().build(),
new Http2MultiplexHandler(createHttp2ChannelInitializer(ch.pipeline()))
);
} else {
return null;
}
}
};

final HttpServerCodec sourceCodec = new HttpServerCodec(
handlingSettings.getMaxInitialLineLength(),
handlingSettings.getMaxHeaderSize(),
handlingSettings.getMaxChunkSize()
);

final HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);
final CleartextHttp2ServerUpgradeHandler cleartextUpgradeHandler = new CleartextHttp2ServerUpgradeHandler(
sourceCodec,
upgradeHandler,
createHttp2ChannelInitializerPriorKnowledge()
);
Comment on lines +399 to +403
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, how would we handle TLS, will that be a change in the security plugin?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we would need to make changes there, I will be picking the security plugin this week.


ch.pipeline().addLast(cleartextUpgradeHandler).addLast(new SimpleChannelInboundHandler<HttpMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);

// If this handler is hit then no upgrade has been attempted and the client is just talking HTTP
final ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), "handler", getRequestHandler());
pipeline.replace(this, "aggregator", aggregator);

ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
if (handlingSettings.isCompression()) {
ch.pipeline()
.addAfter("aggregator", "encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addBefore("handler", "request_creator", requestCreator);
ch.pipeline().addBefore("handler", "response_creator", responseCreator);
ch.pipeline()
.addBefore("handler", "pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));

ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
});
}

protected void configureDefaultHttpPipeline(ChannelPipeline pipeline) {
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
handlingSettings.getMaxHeaderSize(),
handlingSettings.getMaxChunkSize()
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("decoder", decoder);
pipeline.addLast("decoder_compress", new HttpContentDecompressor());
pipeline.addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline().addLast("aggregator", aggregator);
pipeline.addLast("aggregator", aggregator);
if (handlingSettings.isCompression()) {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
pipeline.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addLast("request_creator", requestCreator);
ch.pipeline().addLast("response_creator", responseCreator);
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
pipeline.addLast("request_creator", requestCreator);
pipeline.addLast("response_creator", responseCreator);
pipeline.addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
pipeline.addLast("handler", requestHandler);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
protected void configureDefaultHttp2Pipeline(ChannelPipeline pipeline) {
pipeline.addLast(Http2FrameCodecBuilder.forServer().build())
.addLast(new Http2MultiplexHandler(createHttp2ChannelInitializer(pipeline)));
}

private ChannelInitializer<Channel> createHttp2ChannelInitializerPriorKnowledge() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is PriorKnowledge in this context?

Copy link
Collaborator Author

@reta reta Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is HTTP/2 specific terminology, just to explain quickly: if the connection happens over SSL/TLS, the ALPN is used to negoatiate the protocol and the client could talk to the server using HTTP/2 rigth away. Now, with h2c (clear text, no SSL/TLS), the client always starts with HTTP/1.1 and than protocol upgrade takes place. With prior knowledge, the client assumes to "know apriori" that server talks HTTP/2 and start with that right away.

return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel childChannel) throws Exception {
configureDefaultHttp2Pipeline(childChannel.pipeline());
}
};
}

/**
* Http2MultiplexHandler creates new pipeline, we are preserving the old one in case some handlers need to be
* access (like for example opensearch-security plugin which accesses SSL handlers).
*/
private ChannelInitializer<Channel> createHttp2ChannelInitializer(ChannelPipeline inboundPipeline) {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel childChannel) throws Exception {
final Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(childChannel, inboundPipeline);
childChannel.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);

final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);

childChannel.pipeline()
.addLast(new LoggingHandler(LogLevel.DEBUG))
.addLast(new Http2StreamFrameToHttpObjectCodec(true))
.addLast("byte_buf_sizer", byteBufSizer)
.addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS))
.addLast("decoder_decompress", new HttpContentDecompressor());

if (handlingSettings.isCompression()) {
childChannel.pipeline()
.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}

childChannel.pipeline()
.addLast("aggregator", aggregator)
.addLast("request_creator", requestCreator)
.addLast("response_creator", responseCreator)
.addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents))
.addLast("handler", getRequestHandler());
}
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
httpServerTransport.start();
final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());

try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
final Collection<FullHttpResponse> responses = nettyHttpClient.get(
transportAddress.address(),
"/_cluster/settings?pretty=%"
Expand Down
Loading