Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-10010: Sample Redis ops per second during operations in test #7358

Merged
merged 6 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.geode.NativeRedisTestRule;

public class InfoStatsNativeRedisAcceptanceTest extends AbstractRedisInfoStatsIntegrationTest {
public class InfoStatsNativeRedisAcceptanceTest extends AbstractInfoStatsIntegrationTest {
@ClassRule
public static NativeRedisTestRule redis = new NativeRedisTestRule();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@
package org.apache.geode.redis.internal.commands.executor.server;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.offset;
import static org.assertj.core.api.Assertions.withinPercentage;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.data.Offset;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.Jedis;

Expand All @@ -34,17 +42,19 @@
import org.apache.geode.redis.RedisIntegrationTest;
import org.apache.geode.redis.RedisTestHelper;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisIntegrationTest {
public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrationTest {
@Rule
public ExecutorServiceRule executor = new ExecutorServiceRule();

private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis();
private static final String EXISTING_HASH_KEY = "Existing_Hash";
private static final String EXISTING_STRING_KEY = "Existing_String";
private static final String EXISTING_SET_KEY_1 = "Existing_Set_1";
private static final String EXISTING_SET_KEY_2 = "Existing_Set_2";

private Jedis jedis;
private static long START_TIME;
private static long startTime;
private static StatisticsClock statisticsClock;

private long preTestConnectionsReceived = 0;
Expand Down Expand Up @@ -73,12 +83,12 @@ public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisInte
@BeforeClass
public static void beforeClass() {
statisticsClock = new EnabledStatisticsClock();
START_TIME = statisticsClock.getTime();
startTime = statisticsClock.getTime();
}

@Before
public void before() {
jedis = new Jedis("localhost", getPort(), TIMEOUT);
jedis = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
numInfoCalled.set(0);

long preSetupCommandsProcessed = Long.parseLong(getInfo(jedis).get(COMMANDS_PROCESSED));
Expand Down Expand Up @@ -157,27 +167,57 @@ public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() {
}

@Test
public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() {
int NUMBER_SECONDS_TO_RUN = 10;
AtomicInteger numberOfCommandsExecuted = new AtomicInteger();

await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> {
jedis.set("key", "value");
numberOfCommandsExecuted.getAndIncrement();
return true;
public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring()
throws InterruptedException, ExecutionException, TimeoutException {
final long numberSecondsToRun = 4;
AtomicInteger totalOpsPerformed = new AtomicInteger();

final long startTime = System.currentTimeMillis();
final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis();

// Take a sample in the middle of performing operations to help eliminate warmup as a factor
final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis();
final long timeToGetBaselineOpsPerformed = timeToSampleAt - Duration.ofSeconds(1).toMillis();

// Execute commands in the background
Future<Void> executeCommands = executor.submit(() -> {
Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
while (System.currentTimeMillis() < endTime) {
jedis2.set("key", "value");
totalOpsPerformed.getAndIncrement();
}
jedis2.close();
});
double reportedCommandsPerLastSecond =

// Record the total number of operations performed a second before we plan to sample. A poll
// interval less than the default of 100ms is used to increase the accuracy of the expected
// value, as the stats update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(10))
.until(() -> System.currentTimeMillis() >= timeToGetBaselineOpsPerformed);
final int opsPerformedUntilASecondBeforeSampling = totalOpsPerformed.get();

// Calculate how many operations were performed in the last second. A poll interval less than
// the default of 100ms is used to increase the accuracy of the expected value, as the stats
// update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(10))
.until(() -> System.currentTimeMillis() >= timeToSampleAt);

final double reportedCommandsPerLastSecond =
Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND));

long expected = numberOfCommandsExecuted.get() / NUMBER_SECONDS_TO_RUN;
final int expected = totalOpsPerformed.get() - opsPerformedUntilASecondBeforeSampling;

assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, Offset.offset(4.0));
assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, withinPercentage(12.5));

// if time passes w/o operations
await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true);
executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS);

assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND)))
.isCloseTo(0.0, Offset.offset(1.0));
// Wait two seconds with no operations
Thread.sleep(2000);

// Confirm that instantaneous operations per second returns to zero when no operations are being
// performed, with a small offset to account for the info command being executed
assertThat(Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isCloseTo(0,
offset(1.0));
}

@Test
Expand All @@ -193,39 +233,72 @@ public void networkBytesRead_shouldIncrementBySizeOfCommandSent() {
}

@Test
public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() {

double REASONABLE_SOUNDING_OFFSET = .8;
int NUMBER_SECONDS_TO_RUN = 5;
String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n";
int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length();
public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond()
throws InterruptedException, ExecutionException, TimeoutException {
final int numberSecondsToRun = 4;
final String command = "set";
final String key = "key";
final String value = "value";
final int bytesSentPerCommand =
("*3\r\n$" + command.length() + "\r\n" + command +
"\r\n$" + key.length() + "\r\n" + key +
"\r\n$" + value.length() + "\r\n" + value +
"\r\n").length();
AtomicInteger totalBytesSent = new AtomicInteger();

await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> {
jedis.set("key", "value");
totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND);
return true;
final long startTime = System.currentTimeMillis();
final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis();

// Take a sample in the middle of performing operations to help eliminate warmup as a factor
final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis();
final long timeToGetBaselineBytesSent = timeToSampleAt - Duration.ofSeconds(1).toMillis();

// Execute commands in the background
Future<Void> executeCommands = executor.submit(() -> {
Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
while (System.currentTimeMillis() < endTime) {
jedis2.set(key, value);
totalBytesSent.addAndGet(bytesSentPerCommand);
}
jedis2.close();
});
double actual_kbs = Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND));
double expected_kbs = ((double) totalBytesSent.get() / NUMBER_SECONDS_TO_RUN) / 1000;

assertThat(actual_kbs).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET));
// Record the total number of KB sent a second before we plan to sample. A poll interval less
// than the default of 100ms is used to increase the accuracy of the expected value, as the
// stats update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(10))
.until(() -> System.currentTimeMillis() >= timeToGetBaselineBytesSent);
final int bytesSentUntilASecondBeforeSampling = totalBytesSent.get();

// Calculate how many KB were sent in the last second. A poll interval less than the default of
// 100ms is used to increase the accuracy of the expected value, as the stats update the value
// of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(10))
.until(() -> System.currentTimeMillis() >= timeToSampleAt);

final double reportedKBReadPerLastSecond =
Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND));

final double expected = (totalBytesSent.get() - bytesSentUntilASecondBeforeSampling) / 1024.0;

assertThat(reportedKBReadPerLastSecond).isCloseTo(expected, withinPercentage(12.5));

executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS);

// if time passes w/o operations
await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS)
.until(() -> true);
// Wait two seconds with no operations
Thread.sleep(2000);

// Kb/s should eventually drop to 0 or at least very close since just executing the info
// command may result in the value increasing.
assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)))
.isCloseTo(0.0, Offset.offset(0.1));
// Confirm that instantaneous KB read per second returns to zero when no operations are being
// performed, with a small offset to account for the info command being executed
assertThat(Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)))
.isCloseTo(0, offset(0.02));
}

// ------------------- Clients Section -------------------------- //

@Test
public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() {
Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT);
Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
jedis2.ping();

validateConnectedClients(jedis, preTestConnectedClients, 1);
Expand All @@ -237,7 +310,7 @@ public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() {

@Test
public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() {
Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT);
Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
jedis2.ping();

validateConnectionsReceived(jedis, preTestConnectionsReceived, 1);
Expand Down Expand Up @@ -276,7 +349,7 @@ public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() {

// ------------------- Helper Methods ----------------------------- //
public long getStartTime() {
return START_TIME;
return startTime;
}

public long getCurrentTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.geode.redis.GeodeRedisServerRule;

public class InfoStatsIntegrationTest extends AbstractRedisInfoStatsIntegrationTest {
public class InfoStatsIntegrationTest extends AbstractInfoStatsIntegrationTest {
@ClassRule
public static GeodeRedisServerRule server = new GeodeRedisServerRule();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package org.apache.geode.redis.internal.statistics;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime;
import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor;

import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -37,23 +38,27 @@ public class RedisStats {
private final AtomicLong keyspaceMisses = new AtomicLong();
private final AtomicLong uniqueChannelSubscriptions = new AtomicLong();
private final AtomicLong uniquePatternSubscriptions = new AtomicLong();
private final ScheduledExecutorService perSecondExecutor;

private static final int ROLLING_AVERAGE_SAMPLES_PER_SECOND = 16;
private final ScheduledExecutorService rollingAverageExecutor;
private int rollingAverageTick = 0;
private final RollingAverageStat networkBytesReadRollingAverageStat = new RollingAverageStat();
private volatile double networkKiloBytesReadOverLastSecond;
private volatile long opsPerformedLastTick;
private double opsPerformedOverLastSecond;
private long previousNetworkBytesRead;
private final RollingAverageStat opsPerformedRollingAverageStat = new RollingAverageStat();
private volatile double opsPerformedOverLastSecond;

private final StatisticsClock clock;
private final GeodeRedisStats geodeRedisStats;
private final long START_TIME_IN_NANOS;
private final long startTimeInNanos;


public RedisStats(StatisticsClock clock,
GeodeRedisStats geodeRedisStats) {

this.clock = clock;
this.geodeRedisStats = geodeRedisStats;
perSecondExecutor = startPerSecondUpdater();
START_TIME_IN_NANOS = clock.getTime();
rollingAverageExecutor = startRollingAverageUpdater();
startTimeInNanos = clock.getTime();
}

public void incCommandsProcessed() {
Expand Down Expand Up @@ -114,7 +119,7 @@ public long getCurrentTimeNanos() {
}

public long getUptimeInMilliseconds() {
long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS;
long uptimeInNanos = getCurrentTimeNanos() - startTimeInNanos;
return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos);
}

Expand Down Expand Up @@ -194,43 +199,46 @@ public void changeUniquePatternSubscriptions(long delta) {

public void close() {
geodeRedisStats.close();
stopPerSecondUpdater();
stopRollingAverageUpdater();
}

private ScheduledExecutorService startPerSecondUpdater() {
int INTERVAL = 1;
private ScheduledExecutorService startRollingAverageUpdater() {
long microsPerSecond = 1_000_000;
final long delayMicros = microsPerSecond / ROLLING_AVERAGE_SAMPLES_PER_SECOND;

ScheduledExecutorService perSecondExecutor =
newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-");
ScheduledExecutorService rollingAverageExecutor =
newSingleThreadScheduledExecutor("GemFireRedis-RollingAverageStatUpdater-");

perSecondExecutor.scheduleWithFixedDelay(
this::doPerSecondUpdates,
INTERVAL,
INTERVAL,
SECONDS);
rollingAverageExecutor.scheduleWithFixedDelay(this::doRollingAverageUpdates, delayMicros,
delayMicros, MICROSECONDS);

return perSecondExecutor;
return rollingAverageExecutor;
}

private void stopPerSecondUpdater() {
perSecondExecutor.shutdownNow();
private void stopRollingAverageUpdater() {
rollingAverageExecutor.shutdownNow();
}

private void doPerSecondUpdates() {
updateNetworkKilobytesReadLastSecond();
updateOpsPerformedOverLastSecond();
private void doRollingAverageUpdates() {
networkKiloBytesReadOverLastSecond = networkBytesReadRollingAverageStat
.calculate(getTotalNetworkBytesRead(), rollingAverageTick) / 1024.0;
opsPerformedOverLastSecond =
opsPerformedRollingAverageStat.calculate(getCommandsProcessed(), rollingAverageTick);
rollingAverageTick++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this suggestion but I think it would be better if "rollingAverageTick" was an instance var on RollingAverageStat. Then all this tick manipulation code would happen inside RollingAverageStat.calculate. I know this will use a little bit more memory and time but it does allow the possibility of each RollingAverageStat to have its own number of samples it will average. As it is now when you just look at the calculate method it is unclear how the array index is managed and if we might have an out of bounds index. If this code was moved to calculate it would be clearer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea. Since the tick count is inherently tied to the samples per second, which is also used when determining the delay for the ScheduledExecutorService, it's not possible to have different instances of RollingAverageStat with different number of samples to average over without also having a different instance of ScheduledExecutorService to execute them, but it still cleans things up to move the tick logic into RollingAverageStat.

To make the RollingAverageStat instances more self-contained, I also moved the call to get the current stat value inside the class, so all you need to do is create an instance, then call a zero-argument calculate() method to get the latest average.

if (rollingAverageTick >= ROLLING_AVERAGE_SAMPLES_PER_SECOND) {
rollingAverageTick = 0;
}
}

private void updateNetworkKilobytesReadLastSecond() {
final long totalNetworkBytesRead = getTotalNetworkBytesRead();
long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead;
networkKiloBytesReadOverLastSecond = deltaNetworkBytesRead / 1024.0;
previousNetworkBytesRead = totalNetworkBytesRead;
}
private static class RollingAverageStat {
private long valueReadLastTick;
private final long[] valuesReadOverLastNSamples = new long[ROLLING_AVERAGE_SAMPLES_PER_SECOND];

private void updateOpsPerformedOverLastSecond() {
final long totalOpsPerformed = getCommandsProcessed();
opsPerformedOverLastSecond = totalOpsPerformed - opsPerformedLastTick;
opsPerformedLastTick = totalOpsPerformed;
long calculate(long currentValue, int tickNumber) {
long delta = currentValue - valueReadLastTick;
valueReadLastTick = currentValue;
valuesReadOverLastNSamples[tickNumber] = delta;
return Arrays.stream(valuesReadOverLastNSamples).sum();
}
}
}