Skip to content

Commit

Permalink
fix #931 Cache meters per tags
Browse files Browse the repository at this point in the history
Utilise SslReadHandler for updating the TLS metrics
  • Loading branch information
violetagg committed Jan 9, 2020
1 parent ac5cbb9 commit 72ca8eb
Show file tree
Hide file tree
Showing 26 changed files with 508 additions and 425 deletions.
1 change: 1 addition & 0 deletions src/main/java/reactor/netty/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class Metrics {

public static final String RESPONSE_TIME = ".response.time";

public static final String ADDRESS_RESOLVER = ".address.resolver";

// Tags
public static final String REMOTE_ADDRESS = "remote.address";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ final class AddressResolverGroupMetrics extends AddressResolverGroup<SocketAddre

final ChannelMetricsRecorder recorder;

AddressResolverGroupMetrics(AddressResolverGroup<SocketAddress> resolverGroup) {
this(resolverGroup, null);
}

AddressResolverGroupMetrics(AddressResolverGroup<SocketAddress> resolverGroup,
@Nullable ChannelMetricsRecorder recorder) {
this.resolverGroup = resolverGroup;
Expand Down Expand Up @@ -108,18 +104,10 @@ Future<List<SocketAddress>> resolveAllInternal(SocketAddress address, Supplier<F
}

void record(long resolveTimeStart, String status, SocketAddress remoteAddress) {
if (recorder == null) {
MicrometerChannelMetricsRecorder._recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
}
else {
recorder.recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
}
recorder.recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
}
};
}
Expand Down
72 changes: 5 additions & 67 deletions src/main/java/reactor/netty/channel/BootstrapHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.netty.resolver.AddressResolverGroup;
import reactor.core.Exceptions;
import reactor.netty.ConnectionObserver;
import reactor.netty.Metrics;
import reactor.netty.NettyPipeline;
import reactor.netty.ReactorNetty;
import reactor.util.Logger;
Expand Down Expand Up @@ -441,29 +440,12 @@ static ChannelHandler updateConfiguration(@Nullable ChannelHandler handler,
return new LoggingHandlerSupportConsumer(handler, debugSsl);
}

public static ServerBootstrap updateMetricsSupport(ServerBootstrap b, String name, String protocol) {
return updateConfiguration(b,
NettyPipeline.ChannelMetricsHandler,
new MetricsSupportConsumer(name, protocol, true));
}

public static ServerBootstrap updateMetricsSupport(ServerBootstrap b, ChannelMetricsRecorder recorder) {
return updateConfiguration(b,
NettyPipeline.ChannelMetricsHandler,
new MetricsSupportConsumer(recorder, true));
}

@SuppressWarnings("unchecked")
public static Bootstrap updateMetricsSupport(Bootstrap b, String name, String protocol) {
updateConfiguration(b,
NettyPipeline.ChannelMetricsHandler,
new DeferredMetricsSupport(name, protocol, false));

b.resolver(new AddressResolverGroupMetrics((AddressResolverGroup<SocketAddress>) b.config().resolver()));

return b;
}

@SuppressWarnings("unchecked")
public static Bootstrap updateMetricsSupport(Bootstrap b, ChannelMetricsRecorder recorder) {
updateConfiguration(b,
Expand Down Expand Up @@ -687,36 +669,18 @@ public int hashCode() {
static final class DeferredMetricsSupport
implements Function<Bootstrap, BiConsumer<ConnectionObserver, Channel>> {

final String name;

final String protocol;

final ChannelMetricsRecorder recorder;

final boolean onServer;

DeferredMetricsSupport(String name, String protocol, boolean onServer) {
this.name = name;
this.protocol = protocol;
this.recorder = null;
this.onServer = onServer;
}

DeferredMetricsSupport(ChannelMetricsRecorder recorder, boolean onServer) {
this.name = null;
this.protocol = null;
this.recorder = recorder;
this.onServer = onServer;
}

@Override
public BiConsumer<ConnectionObserver, Channel> apply(Bootstrap bootstrap) {
if (recorder != null) {
return new MetricsSupportConsumer(recorder, bootstrap.config().remoteAddress(), onServer);
}
else {
return new MetricsSupportConsumer(name, bootstrap.config().remoteAddress(), protocol, onServer);
}
return new MetricsSupportConsumer(recorder, bootstrap.config().remoteAddress(), onServer);
}

@Override
Expand All @@ -729,50 +693,30 @@ public boolean equals(Object o) {
}
DeferredMetricsSupport that = (DeferredMetricsSupport) o;
return onServer == that.onServer &&
Objects.equals(name, that.name) &&
Objects.equals(protocol, that.protocol) &&
Objects.equals(recorder, that.recorder);
}

@Override
public int hashCode() {
return Objects.hash(name, protocol, recorder, onServer);
return Objects.hash(recorder, onServer);
}
}

static final class MetricsSupportConsumer
implements BiConsumer<ConnectionObserver, Channel> {

final String name;

final SocketAddress remoteAddress;

final String protocol;

final ChannelMetricsRecorder recorder;

final boolean onServer;

MetricsSupportConsumer(String name, String protocol, boolean onServer) {
this(name, null, protocol, onServer);
}

MetricsSupportConsumer(String name, @Nullable SocketAddress remoteAddress, String protocol, boolean onServer) {
this.name = name;
this.remoteAddress = remoteAddress;
this.protocol = protocol;
this.recorder = null;
this.onServer = onServer;
}

MetricsSupportConsumer(ChannelMetricsRecorder recorder, boolean onServer) {
this(recorder, null, onServer);
}

MetricsSupportConsumer(ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress, boolean onServer) {
this.name = null;
this.remoteAddress = remoteAddress;
this.protocol = null;
this.recorder = recorder;
this.onServer = onServer;
}
Expand All @@ -783,26 +727,20 @@ public void accept(ConnectionObserver connectionObserver, Channel channel) {
//TODO or after the proxy?
SocketAddress address = remoteAddress != null ? remoteAddress : channel.remoteAddress();

ChannelMetricsRecorder channelMetricsRecorder = recorder;
if (channelMetricsRecorder == null) {
channelMetricsRecorder =
new MicrometerChannelMetricsRecorder(name, Metrics.formatSocketAddress(address), protocol);
}

channel.pipeline()
.addFirst(NettyPipeline.ChannelMetricsHandler,
new ChannelMetricsHandler(channelMetricsRecorder,
new ChannelMetricsHandler(recorder,
//Check the remote address is it on the proxy or not
address,
onServer));

ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.registerMetrics("pooled", alloc.hashCode() + "",
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled",
((PooledByteBufAllocator) alloc).metric());
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.registerMetrics("unpooled", alloc.hashCode() + "",
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled",
((UnpooledByteBufAllocator) alloc).metric());
}
}
Expand Down
101 changes: 57 additions & 44 deletions src/main/java/reactor/netty/channel/ByteBufAllocatorMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,63 +20,76 @@
import io.micrometer.core.instrument.Metrics;
import io.netty.buffer.ByteBufAllocatorMetric;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.ConcurrentMap;

/**
* @author Violeta Georgieva
*/
final class ByteBufAllocatorMetrics {
static final ByteBufAllocatorMetrics INSTANCE = new ByteBufAllocatorMetrics();

static void registerMetrics(String allocName, String id, ByteBufAllocatorMetric metrics) {
String name = String.format(NAME, allocName);

Gauge.builder(name + USED_HEAP_MEMORY, metrics, ByteBufAllocatorMetric::usedHeapMemory)
.description("The number of the bytes of the heap memory.")
.tags(ID, id)
.register(registry);

Gauge.builder(name + USED_DIRECT_MEMORY, metrics, ByteBufAllocatorMetric::usedDirectMemory)
.description("The number of the bytes of the direct memory.")
.tags(ID, id)
.register(registry);

if (metrics instanceof PooledByteBufAllocatorMetric) {
PooledByteBufAllocatorMetric pooledMetrics = (PooledByteBufAllocatorMetric) metrics;

Gauge.builder(name + HEAP_ARENAS, pooledMetrics, PooledByteBufAllocatorMetric::numHeapArenas)
.description("The number of heap arenas.")
.tags(ID, id)
.register(registry);

Gauge.builder(name + DIRECT_ARENAS, pooledMetrics, PooledByteBufAllocatorMetric::numDirectArenas)
.description("The number of direct arenas.")
.tags(ID, id)
.register(registry);
final ConcurrentMap<String, ByteBufAllocatorMetric> cache = PlatformDependent.newConcurrentHashMap();

Gauge.builder(name + THREAD_LOCAL_CACHES, pooledMetrics, PooledByteBufAllocatorMetric::numThreadLocalCaches)
.description("The number of thread local caches.")
.tags(ID, id)
.register(registry);
private ByteBufAllocatorMetrics() {
}

Gauge.builder(name + TINY_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::tinyCacheSize)
.description("The size of the tiny cache.")
.tags(ID, id)
.register(registry);
void registerMetrics(String allocName, ByteBufAllocatorMetric metrics) {
cache.computeIfAbsent(metrics.hashCode() + "", key -> {
String name = String.format(NAME, allocName);

Gauge.builder(name + SMALL_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::smallCacheSize)
.description("The size of the small cache.")
.tags(ID, id)
Gauge.builder(name + USED_HEAP_MEMORY, metrics, ByteBufAllocatorMetric::usedHeapMemory)
.description("The number of the bytes of the heap memory.")
.tags(ID, key)
.register(registry);

Gauge.builder(name + NORMAL_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::normalCacheSize)
.description("The size of the normal cache.")
.tags(ID, id)
Gauge.builder(name + USED_DIRECT_MEMORY, metrics, ByteBufAllocatorMetric::usedDirectMemory)
.description("The number of the bytes of the direct memory.")
.tags(ID, key)
.register(registry);

Gauge.builder(name + CHUNK_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::chunkSize)
.description("The chunk size for an arena.")
.tags(ID, id)
.register(registry);
}
if (metrics instanceof PooledByteBufAllocatorMetric) {
PooledByteBufAllocatorMetric pooledMetrics = (PooledByteBufAllocatorMetric) metrics;

Gauge.builder(name + HEAP_ARENAS, pooledMetrics, PooledByteBufAllocatorMetric::numHeapArenas)
.description("The number of heap arenas.")
.tags(ID, key)
.register(registry);

Gauge.builder(name + DIRECT_ARENAS, pooledMetrics, PooledByteBufAllocatorMetric::numDirectArenas)
.description("The number of direct arenas.")
.tags(ID, key)
.register(registry);

Gauge.builder(name + THREAD_LOCAL_CACHES, pooledMetrics, PooledByteBufAllocatorMetric::numThreadLocalCaches)
.description("The number of thread local caches.")
.tags(ID, key)
.register(registry);

Gauge.builder(name + TINY_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::tinyCacheSize)
.description("The size of the tiny cache.")
.tags(ID, key)
.register(registry);

Gauge.builder(name + SMALL_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::smallCacheSize)
.description("The size of the small cache.")
.tags(ID, key)
.register(registry);

Gauge.builder(name + NORMAL_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::normalCacheSize)
.description("The size of the normal cache.")
.tags(ID, key)
.register(registry);

Gauge.builder(name + CHUNK_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::chunkSize)
.description("The chunk size for an arena.")
.tags(ID, key)
.register(registry);
}

return metrics;
});
}

static final MeterRegistry registry = Metrics.globalRegistry;
Expand Down
49 changes: 0 additions & 49 deletions src/main/java/reactor/netty/channel/ChannelMetricsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import reactor.netty.NettyPipeline;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -59,13 +56,6 @@ public void channelRegistered(ChannelHandlerContext ctx) {
new ConnectMetricsHandler(recorder));
}

if (ctx.pipeline().get(SslHandler.class) != null) {
ctx.pipeline()
.addAfter(NettyPipeline.SslHandler,
NettyPipeline.SslMetricsHandler,
new TlsMetricsHandler(recorder, remoteAddress));
}

ctx.fireChannelRegistered();
}

Expand Down Expand Up @@ -135,45 +125,6 @@ public ChannelMetricsRecorder recorder() {
return recorder;
}

static final class TlsMetricsHandler extends ChannelInboundHandlerAdapter {

final ChannelMetricsRecorder recorder;

final SocketAddress remoteAddress;

final long tlsHandshakeTimeStart;

TlsMetricsHandler(ChannelMetricsRecorder recorder, SocketAddress remoteAddress) {
this.recorder = recorder;
this.remoteAddress = remoteAddress;
this.tlsHandshakeTimeStart = System.nanoTime();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof SslHandshakeCompletionEvent) {
ctx.pipeline().remove(this);

SslHandshakeCompletionEvent handshake = (SslHandshakeCompletionEvent) evt;
String status;
if (handshake.isSuccess()) {
status = SUCCESS;
}
else {
status = ERROR;
}

recorder.recordTlsHandshakeTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - tlsHandshakeTimeStart),
status);
}

super.userEventTriggered(ctx, evt);
}

}

static final class ConnectMetricsHandler extends ChannelOutboundHandlerAdapter {

final ChannelMetricsRecorder recorder;
Expand Down
Loading

0 comments on commit 72ca8eb

Please sign in to comment.