Skip to content

Commit

Permalink
Polishing #2406
Browse files Browse the repository at this point in the history
Refactor enabledCommands to use a Predicate for metrics recording.

Original pull request: #2407
  • Loading branch information
mp911de committed Aug 15, 2023
1 parent e66c148 commit 95dc1bf
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 45 deletions.
18 changes: 18 additions & 0 deletions src/main/java/io/lettuce/core/metrics/CommandLatencyRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.net.SocketAddress;

import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.RedisCommand;

/**
* Interface defining a method to collect command latency metrics based upon command completion. Command latencies are collected
Expand Down Expand Up @@ -52,6 +53,23 @@ public boolean isEnabled() {
};
}

/**
* Record the command latency per {@code connectionPoint} and {@code commandType}.
*
* @param local the local address
* @param remote the remote address
* @param command the command
* @param firstResponseLatency latency value in {@link java.util.concurrent.TimeUnit#NANOSECONDS} from send to the first
* response
* @param completionLatency latency value in {@link java.util.concurrent.TimeUnit#NANOSECONDS} from send to the command
* completion
* @since 6.3
*/
default void recordCommandLatency(SocketAddress local, SocketAddress remote, RedisCommand<?, ?, ?> command,
long firstResponseLatency, long completionLatency) {
recordCommandLatency(local, remote, command.getType(), firstResponseLatency, completionLatency);
}

/**
* Record the command latency per {@code connectionPoint} and {@code commandType}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.RedisCommand;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.netty.channel.local.LocalAddress;
Expand Down Expand Up @@ -68,14 +69,23 @@ public MicrometerCommandLatencyRecorder(MeterRegistry meterRegistry, MicrometerO
}

@Override
public void recordCommandLatency(SocketAddress local, SocketAddress remote, ProtocolKeyword protocolKeyword,
public void recordCommandLatency(SocketAddress local, SocketAddress remote, RedisCommand<?, ?, ?> redisCommand,
long firstResponseLatency, long completionLatency) {

if (!isEnabled() || !isCommandEnabled(protocolKeyword)) {
if (isEnabled() && isCommandEnabled(redisCommand)) {
recordCommandLatency(local, remote, redisCommand.getType(), firstResponseLatency, completionLatency);
}
}

@Override
public void recordCommandLatency(SocketAddress local, SocketAddress remote, ProtocolKeyword commandType,
long firstResponseLatency, long completionLatency) {

if (!isEnabled()) {
return;
}

CommandLatencyId commandLatencyId = createId(local, remote, protocolKeyword);
CommandLatencyId commandLatencyId = createId(local, remote, commandType);

Timer firstResponseTimer = firstResponseTimers.computeIfAbsent(commandLatencyId, this::firstResponseTimer);
firstResponseTimer.record(firstResponseLatency, TimeUnit.NANOSECONDS);
Expand All @@ -89,9 +99,8 @@ public boolean isEnabled() {
return options.isEnabled();
}

private boolean isCommandEnabled(ProtocolKeyword protocolKeyword) {
return options.getEnabledCommands().isEmpty() ||
options.getEnabledCommands().stream().anyMatch(command -> command.name().equals(protocolKeyword.name()));
private boolean isCommandEnabled(RedisCommand<?, ?, ?> redisCommand) {
return options.getMetricsFilter().test(redisCommand);
}

private CommandLatencyId createId(SocketAddress local, SocketAddress remote, ProtocolKeyword commandType) {
Expand Down
83 changes: 54 additions & 29 deletions src/main/java/io/lettuce/core/metrics/MicrometerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
package io.lettuce.core.metrics;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Predicate;

import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.RedisCommand;
import io.micrometer.core.instrument.Tags;

/**
* Configuration options for {@link MicrometerCommandLatencyRecorder}.
*
* @author Steven Sheehy
* @author Mark Paluch
* @author André Tibola
* @since 6.1
*/
public class MicrometerOptions {
Expand All @@ -45,11 +47,8 @@ public class MicrometerOptions {

public static final double[] DEFAULT_TARGET_PERCENTILES = new double[] { 0.50, 0.90, 0.95, 0.99, 0.999 };

private static final List<CommandType> DEFAULT_ENABLED_COMMANDS = new ArrayList<>();

private static final MicrometerOptions DISABLED = builder().disable().build();


private final Builder builder;

private final boolean enabled;
Expand All @@ -62,23 +61,23 @@ public class MicrometerOptions {

private final Duration minLatency;

private final Predicate<RedisCommand<?, ?, ?>> metricsFilter;

private final Tags tags;

private final double[] targetPercentiles;

private final List<CommandType> enabledCommands;

protected MicrometerOptions(Builder builder) {

this.builder = builder;
this.enabled = builder.enabled;
this.histogram = builder.histogram;
this.localDistinction = builder.localDistinction;
this.metricsFilter = builder.metricsFilter;
this.maxLatency = builder.maxLatency;
this.minLatency = builder.minLatency;
this.tags = builder.tags;
this.targetPercentiles = builder.targetPercentiles;
this.enabledCommands = builder.enabledCommands;
}

/**
Expand Down Expand Up @@ -130,6 +129,8 @@ public static class Builder {

private boolean localDistinction = DEFAULT_LOCAL_DISTINCTION;

private Predicate<RedisCommand<?, ?, ?>> metricsFilter = command -> true;

private Duration maxLatency = DEFAULT_MAX_LATENCY;

private Duration minLatency = DEFAULT_MIN_LATENCY;
Expand All @@ -138,8 +139,6 @@ public static class Builder {

private double[] targetPercentiles = DEFAULT_TARGET_PERCENTILES;

private List<CommandType> enabledCommands = DEFAULT_ENABLED_COMMANDS;

private Builder() {
}

Expand Down Expand Up @@ -193,12 +192,49 @@ public Builder localDistinction(boolean localDistinction) {
return this;
}

/**
* Sets which commands are enabled for latency recording. Defaults to an empty list, which means all commands will be
* recorded. Configuring enabled commands overwrites {@link #metricsFilter(Predicate)}.
*
* @param commands list of Redis commands that are enabled for latency recording, must not be {@code null}
* @return this {@link Builder}.
* @since 6.3
*/
public Builder enabledCommands(CommandType... commands) {

LettuceAssert.notNull(commands, "Commands must not be null");

if (commands.length == 0) {
return metricsFilter(command -> true);
}

Set<String> enabledCommands = new HashSet<>(commands.length);
for (CommandType enabledCommand : commands) {
enabledCommands.add(enabledCommand.name());
}

return metricsFilter(command -> enabledCommands.contains(command.getType().name()));
}

/**
* Configures a filter {@link Predicate} to filter which commands should be reported to Micrometer.
*
* @param filter the filter predicate to test commands.
* @return this {@link Builder}.
* @since 6.3
*/
public Builder metricsFilter(Predicate<RedisCommand<?, ?, ?>> filter) {
LettuceAssert.notNull(filter, "Metrics filter predicate must not be null");
this.metricsFilter = filter;
return this;
}

/**
* Sets the maximum value that this timer is expected to observe. Sets an upper bound on histogram buckets that are
* shipped to monitoring systems that support aggregable percentile approximations. Only applicable when histogram is
* enabled. Defaults to {@code 5m}. See {@link MicrometerOptions#DEFAULT_MAX_LATENCY}.
*
* @param maxLatency The maximum value that this timer is expected to observe
* @param maxLatency the maximum value that this timer is expected to observe, must not be {@code null}
* @return this {@link Builder}.
*/
public Builder maxLatency(Duration maxLatency) {
Expand All @@ -212,7 +248,7 @@ public Builder maxLatency(Duration maxLatency) {
* shipped to monitoring systems that support aggregable percentile approximations. Only applicable when histogram is
* enabled. Defaults to {@code 1ms}. See {@link MicrometerOptions#DEFAULT_MIN_LATENCY}.
*
* @param minLatency The minimum value that this timer is expected to observe
* @param minLatency the minimum value that this timer is expected to observe
* @return this {@link Builder}.
*/
public Builder minLatency(Duration minLatency) {
Expand All @@ -224,7 +260,7 @@ public Builder minLatency(Duration minLatency) {
/**
* Extra tags to add to the generated metrics. Defaults to {@code Tags.empty()}.
*
* @param tags Tags to add to the metrics
* @param tags tags to add to the metrics
* @return this {@link Builder}.
*/
public Builder tags(Tags tags) {
Expand All @@ -246,18 +282,6 @@ public Builder targetPercentiles(double[] targetPercentiles) {
return this;
}

/**
* Sets which commands are enabled for latency recording. Defaults to an empty list, which means all commands will be recorded
* See {@link MicrometerOptions#DEFAULT_ENABLED_COMMANDS}.
*
* @param commands list of Redis commands that are enabled for latency recording
* @return this {@link Builder}.
*/
public Builder enabledCommands(CommandType... commands) {
this.enabledCommands = Arrays.asList(commands);
return this;
}

/**
* @return a new instance of {@link MicrometerOptions}.
*/
Expand All @@ -279,6 +303,10 @@ public boolean localDistinction() {
return localDistinction;
}

public Predicate<RedisCommand<?, ?, ?>> getMetricsFilter() {
return metricsFilter;
}

public Duration maxLatency() {
return maxLatency;
}
Expand All @@ -297,7 +325,4 @@ public double[] targetPercentiles() {
return result;
}

public List<CommandType> getEnabledCommands() {
return enabledCommands;
}
}
7 changes: 3 additions & 4 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ private boolean decode(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?
return false;
}

recordLatency(withLatency, command.getType());
recordLatency(withLatency, command);

return true;
}
Expand Down Expand Up @@ -903,15 +903,14 @@ protected void afterDecode(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> comm
decodeBufferPolicy.afterCommandDecoded(buffer);
}

private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {
private void recordLatency(WithLatency withLatency, RedisCommand<?, ?, ?> command) {

if (withLatency != null && latencyMetricsEnabled && channel != null && remote() != null) {

long firstResponseLatency = withLatency.getFirstResponse() - withLatency.getSent();
long completionLatency = nanoTime() - withLatency.getSent();

commandLatencyRecorder.recordCommandLatency(local(), remote(), commandType, firstResponseLatency,
completionLatency);
commandLatencyRecorder.recordCommandLatency(local(), remote(), command, firstResponseLatency, completionLatency);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandType;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
Expand Down Expand Up @@ -172,8 +173,10 @@ void enabledCommandsNotEmpty() {
MicrometerOptions options = MicrometerOptions.builder().enabledCommands(CommandType.CLUSTER).build();
MicrometerCommandLatencyRecorder commandLatencyRecorder = new MicrometerCommandLatencyRecorder(meterRegistry, options);

commandLatencyRecorder.recordCommandLatency(LOCAL_ADDRESS, REMOTE_ADDRESS, CommandType.AUTH, 1, 10);
commandLatencyRecorder.recordCommandLatency(LOCAL_ADDRESS, REMOTE_ADDRESS, CommandType.CLUSTER, 1, 10);
commandLatencyRecorder.recordCommandLatency(LOCAL_ADDRESS, REMOTE_ADDRESS, new Command<>(CommandType.AUTH, null), 1,
10);
commandLatencyRecorder.recordCommandLatency(LOCAL_ADDRESS, REMOTE_ADDRESS, new Command<>(CommandType.CLUSTER, null), 1,
10);

assertThat(meterRegistry.find(METRIC_COMPLETION).timers()).hasSize(1);
assertThat(meterRegistry.find(METRIC_FIRST_RESPONSE).timers()).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@

import java.time.Duration;

import io.lettuce.core.protocol.CommandType;
import org.junit.jupiter.api.Test;

import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandType;
import io.micrometer.core.instrument.Tags;

/**
* Unit tests for {@link MicrometerOptions}.
*
* @author Steven Sheehy
* @author André Tibola
* @author Mark Paluch
*/
class MicrometerOptionsUnitTests {

Expand Down Expand Up @@ -99,10 +102,11 @@ void targetPercentiles() {

@Test
void enabledCommands() {
CommandType[] enabledCommands = {CommandType.HSET, CommandType.HGET, CommandType.EXPIRE};
CommandType[] enabledCommands = { CommandType.HSET, CommandType.HGET, CommandType.EXPIRE };
MicrometerOptions options = MicrometerOptions.builder().enabledCommands(enabledCommands).build();

assertThat(options.getEnabledCommands()).hasSize(3).containsExactly(enabledCommands);
assertThat(options.getMetricsFilter()).accepts(new Command<>(CommandType.HSET, null))
.rejects(new Command<>(CommandType.SET, null));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ void shouldRecordCorrectFirstResponseLatency() throws Exception {

sut.channelRead(context, Unpooled.wrappedBuffer("*1\r\n+OK\r\n".getBytes()));

verify(latencyCollector).recordCommandLatency(any(), any(), eq(CommandType.APPEND), gt(0L), gt(0L));
verify(latencyCollector).recordCommandLatency(any(), any(), any(LatencyMeteredCommand.class), gt(0L), gt(0L));

sut.channelUnregistered(context);
}
Expand Down

0 comments on commit 95dc1bf

Please sign in to comment.