diff --git a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java index 8fef49f5c..3e8cc2bf2 100644 --- a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java +++ b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java @@ -8,6 +8,7 @@ import org.springframework.expression.Expression; import org.springframework.util.Assert; +import com.redis.lettucemod.api.StatefulRedisModulesConnection; import com.redis.lettucemod.api.sync.RediSearchCommands; import com.redis.lettucemod.search.Field; import com.redis.lettucemod.search.IndexInfo; @@ -83,10 +84,13 @@ private Map fields() { private Map searchIndexFields() { Map searchFields = new LinkedHashMap<>(); - RediSearchCommands commands = getRedisConnection().sync(); - IndexInfo info = RedisModulesUtils.indexInfo(commands.ftInfo(searchIndex)); - for (Field field : info.getFields()) { - searchFields.put(field.getName(), RiotUtils.parse(expression(field))); + try (StatefulRedisModulesConnection connection = RedisModulesUtils + .connection(getRedisClient())) { + RediSearchCommands commands = connection.sync(); + IndexInfo info = RedisModulesUtils.indexInfo(commands.ftInfo(searchIndex)); + for (Field field : info.getFields()) { + searchFields.put(field.getName(), RiotUtils.parse(expression(field))); + } } return searchFields; } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java index 9b69355c5..f335a13d7 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java @@ -1,7 +1,6 @@ package com.redis.riot.redis; import java.io.PrintWriter; -import java.time.Duration; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -14,6 +13,9 @@ import org.springframework.batch.repeat.RepeatStatus; import org.springframework.util.Assert; +import com.redis.lettucemod.api.StatefulRedisModulesConnection; +import com.redis.lettucemod.api.sync.RedisModulesCommands; +import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.core.AbstractRunnable; import io.lettuce.core.metrics.CommandMetrics.CommandLatency; @@ -32,20 +34,11 @@ public class Ping extends AbstractRunnable { private TimeUnit timeUnit = DEFAULT_TIME_UNIT; private boolean latencyDistribution; private double[] percentiles = DEFAULT_PERCENTILES; - private Duration sleep; public void setOut(PrintWriter out) { this.out = out; } - public void setSleep(Duration sleep) { - this.sleep = sleep; - } - - public Duration getSleep() { - return sleep; - } - public int getIterations() { return iterations; } @@ -92,39 +85,44 @@ protected Job job() { CallableTaskletAdapter tasklet = new CallableTaskletAdapter(); tasklet.setCallable(this::call); step.setName(getName()); + step.setTransactionManager(getJobFactory().getPlatformTransactionManager()); step.setJobRepository(getJobFactory().getJobRepository()); step.setTasklet(tasklet); return jobBuilder().start(step).build(); } private RepeatStatus call() { - for (int iteration = 0; iteration < iterations; iteration++) { - LatencyStats stats = new LatencyStats(); - for (int index = 0; index < count; index++) { - long startTime = System.nanoTime(); - String reply = getRedisConnection().sync().ping(); - Assert.isTrue("pong".equalsIgnoreCase(reply), "Invalid PING reply received: " + reply); - stats.recordLatency(System.nanoTime() - startTime); - } - Histogram histogram = stats.getIntervalHistogram(); - if (latencyDistribution) { - histogram.outputPercentileDistribution(System.out, (double) timeUnit.toNanos(1)); - } - Map percentileMap = new TreeMap<>(); - for (double targetPercentile : percentiles) { - long percentile = toTimeUnit(histogram.getValueAtPercentile(targetPercentile)); - percentileMap.put(targetPercentile, percentile); - } - long min = toTimeUnit(histogram.getMinValue()); - long max = toTimeUnit(histogram.getMaxValue()); - CommandLatency latency = new CommandLatency(min, max, percentileMap); - out.println(latency.toString()); - if (sleep != null) { - try { - Thread.sleep(sleep.toMillis()); - } catch (InterruptedException e) { - // Restore interrupted state... - Thread.currentThread().interrupt(); + try (StatefulRedisModulesConnection connection = RedisModulesUtils + .connection(getRedisClient())) { + RedisModulesCommands commands = connection.sync(); + for (int iteration = 0; iteration < iterations; iteration++) { + LatencyStats stats = new LatencyStats(); + for (int index = 0; index < count; index++) { + long startTime = System.nanoTime(); + String reply = commands.ping(); + Assert.isTrue("pong".equalsIgnoreCase(reply), "Invalid PING reply received: " + reply); + stats.recordLatency(System.nanoTime() - startTime); + } + Histogram histogram = stats.getIntervalHistogram(); + if (latencyDistribution) { + histogram.outputPercentileDistribution(System.out, (double) timeUnit.toNanos(1)); + } + Map percentileMap = new TreeMap<>(); + for (double targetPercentile : percentiles) { + long percentile = toTimeUnit(histogram.getValueAtPercentile(targetPercentile)); + percentileMap.put(targetPercentile, percentile); + } + long min = toTimeUnit(histogram.getMinValue()); + long max = toTimeUnit(histogram.getMaxValue()); + CommandLatency latency = new CommandLatency(min, max, percentileMap); + out.println(latency.toString()); + if (getSleep() != null) { + try { + Thread.sleep(getSleep().toMillis()); + } catch (InterruptedException e) { + // Restore interrupted state... + Thread.currentThread().interrupt(); + } } } } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java index e24c49725..dbc93b870 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java @@ -18,6 +18,8 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.expression.spel.support.StandardEvaluationContext; +import com.redis.lettucemod.api.StatefulRedisModulesConnection; +import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.core.AbstractExport; import com.redis.riot.core.RedisClientOptions; import com.redis.riot.core.RedisWriterOptions; @@ -159,8 +161,9 @@ protected FaultTolerantStepBuilder step(String name, ItemReader } private void checkKeyspaceNotificationEnabled() { - try { - String config = getRedisConnection().sync().configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS) + try (StatefulRedisModulesConnection connection = RedisModulesUtils + .connection(getRedisClient())) { + String config = connection.sync().configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS) .getOrDefault(CONFIG_NOTIFY_KEYSPACE_EVENTS, ""); if (!config.contains("K")) { log.error( diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java index 1779b1590..a124b0f13 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java @@ -9,6 +9,8 @@ import org.springframework.batch.item.support.PassThroughItemProcessor; import org.springframework.expression.spel.support.StandardEvaluationContext; +import com.redis.lettucemod.api.StatefulRedisModulesConnection; +import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.core.function.DropStreamMessageIdFunction; import com.redis.riot.core.function.ExpressionFunction; import com.redis.riot.core.function.KeyValueOperator; @@ -43,7 +45,8 @@ protected ItemProcessor, KeyValue> processor( protected StandardEvaluationContext evaluationContext() { StandardEvaluationContext evaluationContext = evaluationContextOptions.evaluationContext(); - evaluationContext.setVariable(REDIS_VAR, getRedisConnection().sync()); + StatefulRedisModulesConnection connection = RedisModulesUtils.connection(getRedisClient()); + evaluationContext.setVariable(REDIS_VAR, connection.sync()); return evaluationContext; } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRunnable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRunnable.java index c023d3d34..be7598a5f 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRunnable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRunnable.java @@ -1,8 +1,5 @@ package com.redis.riot.core; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; -import com.redis.lettucemod.util.RedisModulesUtils; - import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.RedisURI; @@ -12,16 +9,13 @@ public abstract class AbstractRunnable extends AbstractJobRunnable { private RedisURI redisURI; private AbstractRedisClient redisClient; - private StatefulRedisModulesConnection redisConnection; @Override public void run() { redisURI = redisClientOptions.redisURI(); try { redisClient = redisClientOptions.client(redisURI); - redisConnection = RedisModulesUtils.connection(redisClient); super.run(); - redisConnection.close(); } finally { redisClient.close(); redisClient.getResources().shutdown(); @@ -44,8 +38,4 @@ protected AbstractRedisClient getRedisClient() { return redisClient; } - protected StatefulRedisModulesConnection getRedisConnection() { - return redisConnection; - } - }