Skip to content

Commit

Permalink
Switch default pause detector to NoPauseDetector #1995
Browse files Browse the repository at this point in the history
Original pull request: #2005.
  • Loading branch information
sinrimin authored and mp911de committed Feb 25, 2022
1 parent c47dd5f commit c9c113f
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ static CommandLatencyCollectorOptions.Builder builder() {
*/
boolean isEnabled();

/**
* Returns whether PauseDetector is enabled.
*
* @return {@code true} if the PauseDetector is enabled
*/
boolean usePauseDetector();

/**
* Builder for {@link CommandLatencyCollectorOptions}.
*
Expand All @@ -124,6 +131,21 @@ interface Builder {
*/
Builder enable();

/**
* Use LatencyUtils.SimplePauseDetector to detects pauses.
* See {@link org.LatencyUtils.SimplePauseDetector}
*
* @return this {@link DefaultCommandLatencyCollectorOptions.Builder}.
*/
Builder usePauseDetector();

/**
* Do not detects pauses.
*
* @return this {@link DefaultCommandLatencyCollectorOptions.Builder}.
*/
Builder useNoPauseDetector();

/**
* Enables per connection metrics tracking insead of per host/port. If {@code true}, multiple connections to the same
* host/connection point will be recorded separately which allows to inspect every connection individually. If
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.HdrHistogram.Histogram;
import org.LatencyUtils.LatencyStats;
import org.LatencyUtils.PauseDetector;
import org.LatencyUtils.PauseDetectorListener;
import org.LatencyUtils.SimplePauseDetector;

import io.lettuce.core.internal.LettuceAssert;
Expand All @@ -58,7 +59,9 @@ public class DefaultCommandLatencyCollector implements CommandLatencyCollector {

private static final boolean HDR_UTILS_AVAILABLE = isPresent("org.HdrHistogram.Histogram");

private static final PauseDetectorWrapper GLOBAL_PAUSE_DETECTOR = PauseDetectorWrapper.create();
private static final PauseDetectorWrapper GLOBAL_PAUSE_DETECTOR = PauseDetectorWrapper.create(true);

private static final PauseDetectorWrapper GLOBAL_NO_PAUSE_DETECTOR = PauseDetectorWrapper.create(false);

private static final long MIN_LATENCY = 1000;

Expand Down Expand Up @@ -102,16 +105,13 @@ public void recordCommandLatency(SocketAddress local, SocketAddress remote, Prot

do {
if (PAUSE_DETECTOR_UPDATER.get(this) == null) {
if (PAUSE_DETECTOR_UPDATER.compareAndSet(this, null, GLOBAL_PAUSE_DETECTOR)) {
if (PAUSE_DETECTOR_UPDATER.compareAndSet(this, null,
options.usePauseDetector() ? GLOBAL_PAUSE_DETECTOR : GLOBAL_NO_PAUSE_DETECTOR)) {
PAUSE_DETECTOR_UPDATER.get(this).retain();
}
}
PauseDetectorWrapper pauseDetectorWrapper = PAUSE_DETECTOR_UPDATER.get(this);
if (pauseDetectorWrapper instanceof DefaultPauseDetectorWrapper) {
pauseDetector = ((DefaultPauseDetectorWrapper) pauseDetectorWrapper).getPauseDetector();
} else {
return;
}
pauseDetector = pauseDetectorWrapper.getPauseDetector();
} while (pauseDetector == null);

PauseDetector pauseDetectorToUse = pauseDetector;
Expand Down Expand Up @@ -322,6 +322,34 @@ public Histogram getCompletionHistogram() {

}

/**
* No-operation {@link PauseDetector} implementation.
*/
static class NoPauseDetector extends PauseDetector {
protected NoPauseDetector() {
}

@Override
protected synchronized void notifyListeners(long pauseLengthNsec, long pauseEndTimeNsec) {
}

@Override
public synchronized void addListener(PauseDetectorListener listener) {
}

@Override
public synchronized void addListener(PauseDetectorListener listener, boolean isHighPriority) {
}

@Override
public synchronized void removeListener(PauseDetectorListener listener) {
}

@Override
public void shutdown() {
}
}

/**
* Wrapper for initialization of {@link PauseDetector}. Encapsulates absence of LatencyUtils.
*/
Expand All @@ -331,6 +359,7 @@ interface PauseDetectorWrapper {
* No-operation {@link PauseDetectorWrapper} implementation.
*/
PauseDetectorWrapper NO_OP = new PauseDetectorWrapper() {
private final PauseDetector pauseDetector = new NoPauseDetector();

@Override
public void release() {
Expand All @@ -340,11 +369,16 @@ public void release() {
public void retain() {
}

@Override
public PauseDetector getPauseDetector() {
return pauseDetector;
}

};

static PauseDetectorWrapper create() {
static PauseDetectorWrapper create(boolean usePauseDetector) {

if (HDR_UTILS_AVAILABLE && LATENCY_UTILS_AVAILABLE) {
if (HDR_UTILS_AVAILABLE && LATENCY_UTILS_AVAILABLE && usePauseDetector) {
return new DefaultPauseDetectorWrapper();
}

Expand All @@ -361,6 +395,10 @@ static PauseDetectorWrapper create() {
*/
void release();

/**
* Obtain the current {@link PauseDetector}. Requires a call to {@link #retain()} first.
*/
PauseDetector getPauseDetector();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class DefaultCommandLatencyCollectorOptions implements CommandLatencyColl

public static final boolean DEFAULT_ENABLED = true;

public static final boolean DEFAULT_USE_NO_PAUSE_DETECTOR = false;

private static final DefaultCommandLatencyCollectorOptions DISABLED = builder().disable().build();

private final TimeUnit targetUnit;
Expand All @@ -50,12 +52,15 @@ public class DefaultCommandLatencyCollectorOptions implements CommandLatencyColl

private final Builder builder;

private boolean usePauseDetector;

protected DefaultCommandLatencyCollectorOptions(Builder builder) {
this.targetUnit = builder.targetUnit;
this.targetPercentiles = builder.targetPercentiles;
this.resetLatenciesAfterEvent = builder.resetLatenciesAfterEvent;
this.localDistinction = builder.localDistinction;
this.enabled = builder.enabled;
this.usePauseDetector = builder.usePauseDetector;
this.builder = builder;
}

Expand Down Expand Up @@ -117,6 +122,8 @@ public static class Builder implements CommandLatencyCollectorOptions.Builder {

private boolean enabled = DEFAULT_ENABLED;

private boolean usePauseDetector = DEFAULT_USE_NO_PAUSE_DETECTOR;

private Builder() {
}

Expand All @@ -143,6 +150,31 @@ public Builder enable() {
return this;
}

/**
* Use LatencyUtils.SimplePauseDetector to detects pauses.
* See {@link org.LatencyUtils.SimplePauseDetector}.
* Defaults to useNoPauseDetector.See {@link DefaultCommandLatencyCollectorOptions#DEFAULT_USE_NO_PAUSE_DETECTOR}.
*
* @return this {@link Builder}.
*/
@Override
public Builder usePauseDetector() {
this.usePauseDetector = true;
return this;
}

/**
* Do not detects pauses.
* Defaults to useNoPauseDetector.See {@link DefaultCommandLatencyCollectorOptions#DEFAULT_USE_NO_PAUSE_DETECTOR}.
*
* @return this {@link Builder}.
*/
@Override
public Builder useNoPauseDetector() {
this.usePauseDetector = false;
return this;
}

/**
* Set the target unit for the latencies. Defaults to {@link TimeUnit#MILLISECONDS}. See
* {@link DefaultCommandLatencyCollectorOptions#DEFAULT_TARGET_UNIT}.
Expand Down Expand Up @@ -238,4 +270,8 @@ public boolean isEnabled() {
return enabled;
}

@Override
public boolean usePauseDetector() {
return usePauseDetector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -93,7 +96,8 @@ void shutdownShouldReleasePauseDetector() {
@Test
void verifyMetrics() {

sut = new DefaultCommandLatencyCollector(DefaultCommandLatencyCollectorOptions.create());
sut = new DefaultCommandLatencyCollector(DefaultCommandLatencyCollectorOptions
.builder().usePauseDetector().build());

setupData();

Expand Down Expand Up @@ -128,6 +132,7 @@ void verifyMetrics() {
void verifyCummulativeMetrics() {

sut = new DefaultCommandLatencyCollector(DefaultCommandLatencyCollectorOptions.builder()
.usePauseDetector()
.resetLatenciesAfterEvent(false).build());

setupData();
Expand All @@ -146,4 +151,65 @@ private void setupData() {
sut.recordCommandLatency(LocalAddress.ANY, LocalAddress.ANY, CommandType.BGSAVE, MILLISECONDS.toNanos(300),
MILLISECONDS.toNanos(1000));
}

@Test
void verifyNoPauseDetector() {
int loop = 100000;

sut = new DefaultCommandLatencyCollector(DefaultCommandLatencyCollectorOptions
.builder()
// PauseDetection will not work as expected
// .usePauseDetector()
.build()
);

setupLoopData(loop);

Map<CommandLatencyId, CommandMetrics> latencies = sut.retrieveMetrics();
assertThat(latencies).hasSize(1);

Map.Entry<CommandLatencyId, CommandMetrics> entry = latencies.entrySet().iterator().next();

assertThat(entry.getKey().commandType()).isSameAs(CommandType.BGSAVE);

CommandMetrics metrics = entry.getValue();

assertThat(metrics.getCount()).isEqualTo(loop);
assertThat(metrics.getCompletion().getMin()).isBetween(990000L, 1100000L);
assertThat(metrics.getCompletion().getMax()).isBetween(990000L, 1100000L);
assertThat(metrics.getCompletion().getPercentiles()).hasSize(5);

assertThat(metrics.getFirstResponse().getMin()).isBetween(90000L, 110000L);
assertThat(metrics.getFirstResponse().getMax()).isBetween(90000L, 110000L);
assertThat(metrics.getCompletion().getPercentiles()).containsKey(50.0d);

assertThat(metrics.getFirstResponse().getPercentiles().get(50d)).isLessThanOrEqualTo(
metrics.getCompletion().getPercentiles().get(50d));

assertThat(metrics.getTimeUnit()).isEqualTo(MICROSECONDS);

assertThat(sut.retrieveMetrics()).isEmpty();

sut.shutdown();
}

private void setupLoopData(int loop) {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final Runner runner = new Runner();

for (int ndx = 0; ndx < loop; ndx++) executorService.submit(runner);
try {
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
}
}

class Runner implements Runnable {
@Override
public void run() {
sut.recordCommandLatency(LocalAddress.ANY, LocalAddress.ANY, CommandType.BGSAVE, MILLISECONDS.toNanos(100),
MILLISECONDS.toNanos(1000));
}
}
}

0 comments on commit c9c113f

Please sign in to comment.