Skip to content

Commit

Permalink
Test of single connection reuse for Netty (#2630)
Browse files Browse the repository at this point in the history
* Test of single connection reuse for Netty

* Format
  • Loading branch information
iNikem authored Mar 26, 2021
1 parent 43f4ab4 commit 5a9b0b9
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.HttpClientTracingHandler
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
Expand Down Expand Up @@ -374,4 +375,9 @@ class Netty41ClientTest extends HttpClientTest implements AgentTestTrait {
ch.pipeline().addLast("added_in_initializer", new HttpClientCodec())
}
}

@Override
SingleConnection createSingleConnection(String host, int port) {
return new SingleNettyConnection(host, port)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.opentelemetry.instrumentation.test.base.SingleConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/*
Netty does not actually support proper http pipelining and has no way to correlate incoming response
message with some sent request. This means that without some support from the higher level protocol
we cannot concurrently send several requests across the same channel. Thus doRequest method of this
class is synchronised. Yes, it seems kinda pointless, but at least we test that our instrumentation
does not wreak havoc on Netty channel.
*/
public class SingleNettyConnection implements SingleConnection {
private final String host;
private final int port;
private final Channel channel;

public SingleNettyConnection(String host, int port) {
this.host = host;
this.port = port;
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new HttpClientCodec());
}
});

ChannelFuture channelFuture = bootstrap.connect(host, port);
channelFuture.awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
throw new IllegalStateException(channelFuture.cause());
} else {
channel = channelFuture.channel();
}
}

@Override
public synchronized int doRequest(String path, Map<String, String> headers)
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Integer> result = new CompletableFuture<>();

channel.pipeline().addLast(new ClientHandler(null, result));

String url;
try {
url = new URL("http", host, port, path).toString();
} catch (MalformedURLException e) {
throw new ExecutionException(e);
}

HttpRequest request =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, url, Unpooled.EMPTY_BUFFER);
request.headers().set(HttpHeaderNames.HOST, host);
headers.forEach((k, v) -> request.headers().set(k, v));

channel.writeAndFlush(request).get();
return result.get(20, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

/**
* Helper class for http client tests which require a single connection.
Expand All @@ -24,5 +25,5 @@ public interface SingleConnection {
String REQUEST_ID_HEADER = "test-request-id";

int doRequest(String path, Map<String, String> headers)
throws ExecutionException, InterruptedException;
throws ExecutionException, InterruptedException, TimeoutException;
}

0 comments on commit 5a9b0b9

Please sign in to comment.