diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java index eca560be7..7f3c2f425 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java @@ -36,6 +36,7 @@ public abstract class AbstractExportCommand extends AbstractJobCommand { @Override protected void execute() throws Exception { sourceRedisContext = sourceRedisContext(); + sourceRedisContext.afterPropertiesSet(); try { super.execute(); } finally { diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractImportCommand.java index ce6aa63a1..938e8d3a6 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractImportCommand.java @@ -15,7 +15,6 @@ import org.springframework.util.CollectionUtils; import com.redis.riot.core.AbstractJobCommand; -import com.redis.riot.core.Expression; import com.redis.riot.core.QuietMapAccessor; import com.redis.riot.core.RiotUtils; import com.redis.riot.core.Step; @@ -87,6 +86,7 @@ protected Step, Map> step(ItemReader writer) { targetRedisWriterArgs.configure(writer); } - static class ExpressionProcessor implements ItemProcessor, Map> { - - private final EvaluationContext context; - private final Map expressions; - - public ExpressionProcessor(EvaluationContext context, Map expressions) { - this.context = context; - this.expressions = expressions; - } - - @Override - public Map process(Map item) throws Exception { - expressions.forEach((k, v) -> item.put(k, v.getValue(context, item))); - return item; - } - - } - public RedisWriterArgs getTargetRedisWriterArgs() { return targetRedisWriterArgs; } diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java index a8f8fe5b1..08019ae3b 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java @@ -16,6 +16,7 @@ public abstract class AbstractRedisCommand extends AbstractJobCommand { @Override protected void execute() throws Exception { redisContext = redisArgs.redisContext(); + redisContext.afterPropertiesSet(); try { super.execute(); } finally { diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisExportCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisExportCommand.java index 85ee46621..d3d7416a0 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisExportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisExportCommand.java @@ -8,8 +8,6 @@ import com.redis.riot.core.processor.RegexNamedGroupFunction; import com.redis.riot.function.KeyValueMap; -import com.redis.spring.batch.item.redis.RedisItemReader; -import com.redis.spring.batch.item.redis.RedisItemWriter; import com.redis.spring.batch.item.redis.common.KeyValue; import picocli.CommandLine.ArgGroup; @@ -20,9 +18,6 @@ public abstract class AbstractRedisExportCommand extends AbstractExportCommand { @ArgGroup(exclusive = false) private RedisArgs redisArgs = new RedisArgs(); - @Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "") - private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE; - @Option(names = "--key-regex", description = "Regex for key-field extraction, e.g. '\\w+:(?.+)' extracts an id field from the key", paramLabel = "") private Pattern keyRegex; @@ -31,13 +26,6 @@ protected RedisContext sourceRedisContext() { return redisArgs.redisContext(); } - @Override - protected void configureSourceRedisReader(RedisItemReader reader) { - super.configureSourceRedisReader(reader); - log.info("Configuring Redis reader with poolSize {}", poolSize); - reader.setPoolSize(poolSize); - } - protected ItemProcessor, Map> mapProcessor() { KeyValueMap mapFunction = new KeyValueMap(); if (keyRegex != null) { @@ -54,14 +42,6 @@ public void setRedisArgs(RedisArgs clientArgs) { this.redisArgs = clientArgs; } - public int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; - } - public Pattern getKeyRegex() { return keyRegex; } diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisImportCommand.java index f139f8652..1dca2af4e 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisImportCommand.java @@ -1,25 +1,12 @@ package com.redis.riot; -import com.redis.spring.batch.item.redis.RedisItemWriter; - import picocli.CommandLine.ArgGroup; -import picocli.CommandLine.Option; public abstract class AbstractRedisImportCommand extends AbstractImportCommand { @ArgGroup(exclusive = false) private RedisArgs redisArgs = new RedisArgs(); - @Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "") - private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE; - - @Override - protected void configureTargetRedisWriter(RedisItemWriter writer) { - super.configureTargetRedisWriter(writer); - log.info("Configuring Redis writer with poolSize {}", poolSize); - writer.setPoolSize(poolSize); - } - @Override protected RedisContext targetRedisContext() { return redisArgs.redisContext(); @@ -33,12 +20,4 @@ public void setRedisArgs(RedisArgs clientArgs) { this.redisArgs = clientArgs; } - public int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisTargetExportCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisTargetExportCommand.java index 959d1db76..e894af3ab 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisTargetExportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisTargetExportCommand.java @@ -34,6 +34,7 @@ public abstract class AbstractRedisTargetExportCommand extends AbstractExportCom @Override protected void execute() throws Exception { targetRedisContext = targetRedisContext(); + targetRedisContext.afterPropertiesSet(); try { super.execute(); } finally { @@ -44,12 +45,16 @@ protected void execute() throws Exception { @Override protected RedisContext sourceRedisContext() { log.info("Creating source Redis context with {} {} {}", sourceRedisUri, sourceRedisArgs, sslArgs); - return sourceRedisArgs.redisContext(sourceRedisUri, sslArgs); + RedisContext context = sourceRedisArgs.redisContext(sourceRedisUri); + context.sslOptions(sslArgs.sslOptions()); + return context; } private RedisContext targetRedisContext() { log.info("Creating target Redis context with {} {} {}", targetRedisUri, targetRedisArgs, sslArgs); - return targetRedisArgs.redisContext(targetRedisUri, sslArgs); + RedisContext context = targetRedisArgs.redisContext(targetRedisUri); + context.sslOptions(sslArgs.sslOptions()); + return context; } @Override @@ -58,24 +63,13 @@ protected void configure(StandardEvaluationContext context) { context.setVariable(VAR_TARGET, targetRedisContext.getConnection().sync()); } - @Override - protected void configureSourceRedisReader(RedisItemReader reader) { - super.configureSourceRedisReader(reader); - log.info("Configuring source Redis reader with poolSize {}", sourceRedisArgs.getPoolSize()); - reader.setPoolSize(sourceRedisArgs.getPoolSize()); - } - protected void configureTargetRedisReader(RedisItemReader reader) { configureAsyncReader(reader); targetRedisContext.configure(reader); - log.info("Configuring target Redis reader with poolSize {}", targetRedisArgs.getPoolSize()); - reader.setPoolSize(targetRedisArgs.getPoolSize()); } protected void configureTargetRedisWriter(RedisItemWriter writer) { targetRedisContext.configure(writer); - log.info("Configuring target Redis writer with poolSize {}", targetRedisArgs.getPoolSize()); - writer.setPoolSize(targetRedisArgs.getPoolSize()); } public RedisURI getSourceRedisUri() { diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractReplicateCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractReplicateCommand.java index b9e904e0b..c76bdea27 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractReplicateCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractReplicateCommand.java @@ -33,9 +33,6 @@ public abstract class AbstractReplicateCommand extends AbstractRedisTargetExport private static final String COMPARE_TASK_NAME = "Comparing"; private static final String STATUS_DELIMITER = " | "; - @Option(names = "--target-read-from", description = "Which target Redis cluster nodes to read from: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") - private ReadFrom targetReadFrom = ReadFrom.UPSTREAM; - @Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification. Disables progress reporting.") private boolean showDiffs; @@ -147,21 +144,6 @@ private RedisItemReader compareTargetReader() { return reader; } - @Override - protected void configureTargetRedisReader(RedisItemReader reader) { - super.configureTargetRedisReader(reader); - log.info("Configuring target Redis reader with read-from {}", targetReadFrom); - reader.setReadFrom(targetReadFrom.getReadFrom()); - } - - public ReadFrom getTargetReadFrom() { - return targetReadFrom; - } - - public void setTargetReadFrom(ReadFrom readFrom) { - this.targetReadFrom = readFrom; - } - public boolean isShowDiffs() { return showDiffs; } diff --git a/plugins/riot/src/main/java/com/redis/riot/ExpressionProcessor.java b/plugins/riot/src/main/java/com/redis/riot/ExpressionProcessor.java new file mode 100644 index 000000000..a0032bb63 --- /dev/null +++ b/plugins/riot/src/main/java/com/redis/riot/ExpressionProcessor.java @@ -0,0 +1,26 @@ +package com.redis.riot; + +import java.util.Map; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.expression.EvaluationContext; + +import com.redis.riot.core.Expression; + +public class ExpressionProcessor implements ItemProcessor, Map> { + + private final EvaluationContext context; + private final Map expressions; + + public ExpressionProcessor(EvaluationContext context, Map expressions) { + this.context = context; + this.expressions = expressions; + } + + @Override + public Map process(Map item) throws Exception { + expressions.forEach((k, v) -> item.put(k, v.getValue(context, item))); + return item; + } + +} \ No newline at end of file diff --git a/plugins/riot/src/main/java/com/redis/riot/Generate.java b/plugins/riot/src/main/java/com/redis/riot/Generate.java index 1519732b5..8603d3be7 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Generate.java +++ b/plugins/riot/src/main/java/com/redis/riot/Generate.java @@ -16,7 +16,6 @@ import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; -import picocli.CommandLine.Option; @Command(name = "generate", description = "Generate Redis data structures.") public class Generate extends AbstractRedisCommand { @@ -26,9 +25,6 @@ public class Generate extends AbstractRedisCommand { @ArgGroup(exclusive = false) private GenerateArgs generateArgs = new GenerateArgs(); - @Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "") - private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE; - @ArgGroup(exclusive = false, heading = "Redis writer options%n") private RedisWriterArgs redisWriterArgs = new RedisWriterArgs(); @@ -48,8 +44,6 @@ private RedisItemWriter> writer() { configure(writer); log.info("Configuring Redis writer with {}", redisWriterArgs); redisWriterArgs.configure(writer); - log.info("Configuring Redis writer with poolSize {}", poolSize); - writer.setPoolSize(poolSize); return writer; } @@ -119,12 +113,4 @@ public void setGenerateArgs(GenerateArgs args) { this.generateArgs = args; } - public int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisArgs.java b/plugins/riot/src/main/java/com/redis/riot/RedisArgs.java index f7248c47d..90c380ac7 100644 --- a/plugins/riot/src/main/java/com/redis/riot/RedisArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/RedisArgs.java @@ -5,6 +5,7 @@ import com.redis.lettucemod.RedisURIBuilder; import com.redis.riot.core.RiotUtils; import com.redis.riot.core.RiotVersion; +import com.redis.spring.batch.item.redis.RedisItemReader; import io.lettuce.core.RedisURI; import io.lettuce.core.SslVerifyMode; @@ -59,10 +60,24 @@ public class RedisArgs { @ArgGroup(exclusive = false, heading = "TLS options%n") private SslArgs sslArgs = new SslArgs(); - @Option(names = "--command-metrics", description = "Enable Lettuce command metrics", hidden = true) - private boolean metrics; + @Option(names = "--pool", description = "Max number of Redis connections (default: ${DEFAULT-VALUE}).", paramLabel = "") + private int poolSize = RedisItemReader.DEFAULT_POOL_SIZE; - public RedisURI redisURI() { + @Option(names = "--read-from", description = "Which Redis cluster nodes to read from: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") + private ReadFrom readFrom = ReadFrom.UPSTREAM; + + public RedisContext redisContext() { + RedisContext context = new RedisContext(); + context.cluster(cluster); + context.poolSize(poolSize); + context.protocolVersion(protocolVersion); + context.readFrom(readFrom.getReadFrom()); + context.sslOptions(sslArgs.sslOptions()); + context.uri(uri()); + return context; + } + + public RedisURI uri() { RedisURIBuilder builder = new RedisURIBuilder(); builder.clientName(clientName); builder.database(database); @@ -192,12 +207,20 @@ public void setClientName(String clientName) { this.clientName = clientName; } - public boolean isMetrics() { - return metrics; + public int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; } - public void setMetrics(boolean metrics) { - this.metrics = metrics; + public ReadFrom getReadFrom() { + return readFrom; + } + + public void setReadFrom(ReadFrom readFrom) { + this.readFrom = readFrom; } @Override @@ -205,12 +228,8 @@ public String toString() { return "SimpleRedisArgs [uri=" + uri + ", host=" + host + ", port=" + port + ", socket=" + socket + ", username=" + username + ", password=" + RiotUtils.mask(password) + ", timeout=" + timeout + ", database=" + database + ", tls=" + tls + ", insecure=" + insecure + ", clientName=" + clientName - + ", cluster=" + cluster + ", protocolVersion=" + protocolVersion + ", sslArgs=" + sslArgs - + ", metrics=" + metrics + "]"; - } - - public RedisContext redisContext() { - return RedisContext.create(redisURI(), cluster, protocolVersion, sslArgs, metrics); + + ", cluster=" + cluster + ", poolSize=" + poolSize + ", protocolVersion=" + protocolVersion + + ", readFrom=" + readFrom + ", sslArgs=" + sslArgs + "]"; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisContext.java b/plugins/riot/src/main/java/com/redis/riot/RedisContext.java index 4c83e0255..c3bfedc58 100644 --- a/plugins/riot/src/main/java/com/redis/riot/RedisContext.java +++ b/plugins/riot/src/main/java/com/redis/riot/RedisContext.java @@ -1,5 +1,7 @@ package com.redis.riot; +import org.springframework.beans.factory.InitializingBean; + import com.redis.lettucemod.RedisModulesClientBuilder; import com.redis.lettucemod.RedisModulesUtils; import com.redis.lettucemod.api.StatefulRedisModulesConnection; @@ -8,37 +10,65 @@ import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.ClientOptions; +import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisURI; +import io.lettuce.core.SslOptions; import io.lettuce.core.cluster.ClusterClientOptions; -import io.lettuce.core.metrics.MicrometerCommandLatencyRecorder; -import io.lettuce.core.metrics.MicrometerOptions; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.resource.ClientResources; -import io.micrometer.core.instrument.Metrics; -public class RedisContext implements AutoCloseable { +public class RedisContext implements InitializingBean, AutoCloseable { - private final RedisURI uri; - private final AbstractRedisClient client; - private final StatefulRedisModulesConnection connection; + private RedisURI uri; + private boolean cluster; + private ProtocolVersion protocolVersion; + private SslOptions sslOptions = ClientOptions.DEFAULT_SSL_OPTIONS; + private int poolSize = RedisItemReader.DEFAULT_POOL_SIZE; + private ClientResources clientResources; + private ReadFrom readFrom; - public RedisContext(RedisURI uri, AbstractRedisClient client) { - this.uri = uri; - this.client = client; + private AbstractRedisClient client; + private StatefulRedisModulesConnection connection; + + @Override + public void afterPropertiesSet() throws Exception { + RedisModulesClientBuilder clientBuilder = new RedisModulesClientBuilder(); + clientBuilder.cluster(cluster); + clientBuilder.options(clientOptions()); + clientBuilder.uri(uri); + clientBuilder.resources(clientResources); + this.client = clientBuilder.build(); this.connection = RedisModulesUtils.connection(client); } + private ClientOptions clientOptions() { + ClientOptions.Builder options = cluster ? ClusterClientOptions.builder() : ClientOptions.builder(); + options.protocolVersion(protocolVersion); + options.sslOptions(sslOptions); + return options.build(); + } + public void configure(RedisItemReader reader) { reader.setClient(client); reader.setDatabase(uri.getDatabase()); + reader.setPoolSize(poolSize); + reader.setReadFrom(readFrom); } public void configure(RedisItemWriter writer) { writer.setClient(client); + writer.setPoolSize(poolSize); } - public RedisURI getUri() { - return uri; + @Override + public void close() { + if (connection != null) { + connection.close(); + } + if (client != null) { + client.shutdown(); + client.getResources().shutdown(); + } } public AbstractRedisClient getClient() { @@ -49,35 +79,74 @@ public StatefulRedisModulesConnection getConnection() { return connection; } - @Override - public void close() { - if (connection != null) { - connection.close(); - } - if (client != null) { - client.shutdown(); - client.getResources().shutdown(); - } + public RedisURI getUri() { + return uri; } - public static RedisContext create(RedisURI uri, boolean cluster, ProtocolVersion protocolVersion, SslArgs sslArgs, - boolean metrics) { - RedisModulesClientBuilder clientBuilder = new RedisModulesClientBuilder(); - clientBuilder.cluster(cluster); - ClientOptions.Builder options = cluster ? ClusterClientOptions.builder() : ClientOptions.builder(); - options.protocolVersion(protocolVersion); - if (sslArgs != null) { - options.sslOptions(sslArgs.sslOptions()); - } - clientBuilder.options(options.build()); - clientBuilder.uri(uri); - if (metrics) { - MicrometerOptions meterOptions = MicrometerOptions.create(); - MicrometerCommandLatencyRecorder recorder = new MicrometerCommandLatencyRecorder(Metrics.globalRegistry, - meterOptions); - clientBuilder.resources(ClientResources.builder().commandLatencyRecorder(recorder).build()); - } - return new RedisContext(uri, clientBuilder.build()); + public RedisContext uri(RedisURI uri) { + this.uri = uri; + return this; + } + + public boolean isCluster() { + return cluster; + } + + public RedisContext cluster(boolean cluster) { + this.cluster = cluster; + return this; + } + + public ProtocolVersion getProtocolVersion() { + return protocolVersion; + } + + public RedisContext protocolVersion(ProtocolVersion protocolVersion) { + this.protocolVersion = protocolVersion; + return this; + } + + public SslOptions getSslOptions() { + return sslOptions; + } + + public RedisContext sslOptions(SslOptions sslOptions) { + this.sslOptions = sslOptions; + return this; + } + + public int getPoolSize() { + return poolSize; + } + + public RedisContext poolSize(int size) { + this.poolSize = size; + return this; + } + + public ReadFrom getReadFrom() { + return readFrom; + } + + public RedisContext readFrom(ReadFrom readFrom) { + this.readFrom = readFrom; + return this; + } + + public ClientResources getClientResources() { + return clientResources; + } + + public RedisContext clientResources(ClientResources clientResources) { + this.clientResources = clientResources; + return this; + } + + @Override + public String toString() { + return "RedisContext [uri=" + uri + ", clientResources=" + clientResources + ", cluster=" + cluster + + ", protocolVersion=" + protocolVersion + ", sslOptions=" + sslOptions + ", poolSize=" + poolSize + + ", readFrom=" + readFrom + "]"; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java b/plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java index a0f4afbf1..20af5bbfb 100644 --- a/plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java @@ -51,9 +51,6 @@ public class RedisReaderArgs { @Option(names = "--read-batch", description = "Number of values each reader thread should read in a pipelined call (default: ${DEFAULT-VALUE}).", paramLabel = "") private int chunkSize = DEFAULT_CHUNK_SIZE; - @Option(names = "--read-from", description = "Which Redis cluster nodes to read from: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") - private ReadFrom readFrom = ReadFrom.UPSTREAM; - @Option(names = "--mem-limit", description = "Max mem usage for a key to be read, for example 12KB 5MB. Use 0 for no limit but still read mem usage.", paramLabel = "") private DataSize memUsageLimit; @@ -94,7 +91,6 @@ public void configure(RedisItemReader reader) { reader.setPollTimeout(Duration.ofMillis(pollTimeout)); reader.setProcessor(keyProcessor(reader.getCodec(), keyFilterArgs)); reader.setQueueCapacity(queueCapacity); - reader.setReadFrom(readFrom.getReadFrom()); reader.setRetryLimit(retryLimit); reader.setScanCount(scanCount); reader.setSkipLimit(skipLimit); @@ -160,14 +156,6 @@ public void setChunkSize(int chunkSize) { this.chunkSize = chunkSize; } - public ReadFrom getReadFrom() { - return readFrom; - } - - public void setReadFrom(ReadFrom readFrom) { - this.readFrom = readFrom; - } - public DataSize getMemUsageLimit() { return memUsageLimit; } @@ -252,8 +240,8 @@ public void setSkipLimit(int skipLimit) { public String toString() { return "RedisReaderArgs [mode=" + mode + ", keyPattern=" + keyPattern + ", keyType=" + keyType + ", scanCount=" + scanCount + ", queueCapacity=" + queueCapacity + ", threads=" + threads + ", chunkSize=" + chunkSize - + ", readFrom=" + readFrom + ", memUsageLimit=" + memUsageLimit + ", memUsageSamples=" + memUsageSamples - + ", flushInterval=" + flushInterval + ", idleTimeout=" + idleTimeout + ", notificationQueueCapacity=" + + ", memUsageLimit=" + memUsageLimit + ", memUsageSamples=" + memUsageSamples + ", flushInterval=" + + flushInterval + ", idleTimeout=" + idleTimeout + ", notificationQueueCapacity=" + notificationQueueCapacity + ", retryLimit=" + retryLimit + ", skipLimit=" + skipLimit + ", keyFilterArgs=" + keyFilterArgs + ", pollTimeout=" + pollTimeout + "]"; } diff --git a/plugins/riot/src/main/java/com/redis/riot/Riot.java b/plugins/riot/src/main/java/com/redis/riot/Riot.java index 2c3239dfa..05697ccad 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Riot.java +++ b/plugins/riot/src/main/java/com/redis/riot/Riot.java @@ -1,5 +1,8 @@ package com.redis.riot; +import java.util.Arrays; +import java.util.List; + import com.redis.riot.core.MainCommand; import com.redis.riot.operation.OperationCommand; import com.redis.spring.batch.item.redis.common.Range; @@ -13,9 +16,7 @@ import picocli.CommandLine.RunFirst; import software.amazon.awssdk.regions.Region; -@Command(name = "riot", versionProvider = Versions.class, headerHeading = "A data import/export tool for Redis.%n%n", footerHeading = "%nRun 'riot COMMAND --help' for more information on a command.%n%nFor more help on how to use RIOT, head to http://redis.github.io/riot%n", subcommands = { - DatabaseExport.class, DatabaseImport.class, FakerImport.class, FileExport.class, FileImport.class, - Generate.class, Ping.class, Replicate.class, Compare.class, GenerateCompletion.class }) +@Command(name = "riot", versionProvider = Versions.class, headerHeading = "A data import/export tool for Redis.%n%n", footerHeading = "%nRun 'riot COMMAND --help' for more information on a command.%n%nFor more help on how to use RIOT, head to http://redis.github.io/riot%n") public class Riot extends MainCommand { public static void main(String[] args) { @@ -25,16 +26,58 @@ public static void main(String[] args) { @Override protected CommandLine commandLine() { CommandLine commandLine = super.commandLine(); + subcommands().forEach(commandLine::addSubcommand); IExecutionStrategy defaultStrategy = commandLine.getExecutionStrategy(); commandLine.setExecutionStrategy(r -> execute(r, defaultStrategy)); - configureCommandLine(commandLine); - return commandLine; - } - - public static void configureCommandLine(CommandLine commandLine) { commandLine.registerConverter(RedisURI.class, new RedisURIConverter()); commandLine.registerConverter(Region.class, Region::of); commandLine.registerConverter(Range.class, new RangeConverter()); + return commandLine; + } + + protected List subcommands() { + return Arrays.asList(databaseExport(), databaseImport(), fakerImport(), fileExport(), fileImport(), generate(), + ping(), replicate(), compare(), generateCompletion()); + } + + protected GenerateCompletion generateCompletion() { + return new GenerateCompletion(); + } + + protected Compare compare() { + return new Compare(); + } + + protected Replicate replicate() { + return new Replicate(); + } + + protected Ping ping() { + return new Ping(); + } + + protected Generate generate() { + return new Generate(); + } + + protected FileImport fileImport() { + return new FileImport(); + } + + protected FileExport fileExport() { + return new FileExport(); + } + + protected FakerImport fakerImport() { + return new FakerImport(); + } + + protected DatabaseImport databaseImport() { + return new DatabaseImport(); + } + + protected DatabaseExport databaseExport() { + return new DatabaseExport(); } protected int execute(ParseResult parseResult, IExecutionStrategy defaultStrategy) { diff --git a/plugins/riot/src/main/java/com/redis/riot/SourceRedisArgs.java b/plugins/riot/src/main/java/com/redis/riot/SourceRedisArgs.java index 155b9277e..2254d6588 100644 --- a/plugins/riot/src/main/java/com/redis/riot/SourceRedisArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/SourceRedisArgs.java @@ -37,15 +37,14 @@ public class SourceRedisArgs { @Option(names = "--source-resp", description = "Redis protocol version used to connect to source: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") private ProtocolVersion protocolVersion = ProtocolVersion.RESP2; - @Option(names = "--source-pool", description = "Max pool connections used for source Redis (default: ${DEFAULT-VALUE}).", paramLabel = "") + @Option(names = "--source-pool", description = "Max number of source Redis connections (default: ${DEFAULT-VALUE}).", paramLabel = "") private int poolSize = RedisItemReader.DEFAULT_POOL_SIZE; - @Option(names = "--source-command-metrics", description = "Enable Lettuce command metrics for source Redis", hidden = true) - private boolean metrics; + @Option(names = "--source-read-from", description = "Which source Redis cluster nodes to read from: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") + private ReadFrom readFrom = ReadFrom.UPSTREAM; - public RedisURI redisURI(RedisURI uri) { + private RedisURIBuilder uriBuilder() { RedisURIBuilder builder = new RedisURIBuilder(); - builder.uri(uri); builder.clientName(clientName); builder.password(password); builder.timeout(Duration.ofSeconds(timeout)); @@ -54,7 +53,17 @@ public RedisURI redisURI(RedisURI uri) { if (insecure) { builder.verifyMode(SslVerifyMode.NONE); } - return builder.build(); + return builder; + } + + public RedisContext redisContext(RedisURI uri) { + RedisContext context = new RedisContext(); + context.cluster(cluster); + context.poolSize(poolSize); + context.protocolVersion(protocolVersion); + context.readFrom(readFrom.getReadFrom()); + context.uri(uriBuilder().uri(uri).build()); + return context; } public String getUsername() { @@ -129,24 +138,20 @@ public void setClientName(String clientName) { this.clientName = clientName; } - public boolean isMetrics() { - return metrics; + public ReadFrom getReadFrom() { + return readFrom; } - public void setMetrics(boolean metrics) { - this.metrics = metrics; + public void setReadFrom(ReadFrom readFrom) { + this.readFrom = readFrom; } @Override public String toString() { return "SourceRedisArgs [username=" + username + ", password=" + RiotUtils.mask(password) + ", timeout=" + timeout + ", tls=" + tls + ", insecure=" + insecure + ", clientName=" + clientName + ", cluster=" - + cluster + ", protocolVersion=" + protocolVersion + ", poolSize=" + poolSize + ", metrics=" + metrics + + cluster + ", protocolVersion=" + protocolVersion + ", poolSize=" + poolSize + ", readFrom=" + readFrom + "]"; } - public RedisContext redisContext(RedisURI uri, SslArgs sslArgs) { - return RedisContext.create(redisURI(uri), cluster, protocolVersion, sslArgs, metrics); - } - } diff --git a/plugins/riot/src/main/java/com/redis/riot/TargetRedisArgs.java b/plugins/riot/src/main/java/com/redis/riot/TargetRedisArgs.java index 34b200d7c..2c6fb7420 100644 --- a/plugins/riot/src/main/java/com/redis/riot/TargetRedisArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/TargetRedisArgs.java @@ -38,15 +38,14 @@ public class TargetRedisArgs { @Option(names = "--target-resp", description = "Redis protocol version used to connect to target: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") private ProtocolVersion protocolVersion = ProtocolVersion.RESP2; - @Option(names = "--target-pool", description = "Max pool connections used for target Redis (default: ${DEFAULT-VALUE}).", paramLabel = "") + @Option(names = "--target-pool", description = "Max number of target Redis connections (default: ${DEFAULT-VALUE}).", paramLabel = "") private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE; - @Option(names = "--target-command-metrics", description = "Enable Lettuce command metrics for target Redis", hidden = true) - private boolean metrics; + @Option(names = "--target-read-from", description = "Which target Redis cluster nodes to read from: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") + private ReadFrom readFrom = ReadFrom.UPSTREAM; - public RedisURI redisURI(RedisURI uri) { + private RedisURIBuilder uriBuilder() { RedisURIBuilder builder = new RedisURIBuilder(); - builder.uri(uri); builder.clientName(clientName); builder.password(password); builder.timeout(Duration.ofSeconds(timeout)); @@ -55,11 +54,17 @@ public RedisURI redisURI(RedisURI uri) { if (insecure) { builder.verifyMode(SslVerifyMode.NONE); } - return builder.build(); + return builder; } - public RedisContext redisContext(RedisURI uri, SslArgs sslArgs) { - return RedisContext.create(redisURI(uri), cluster, protocolVersion, sslArgs, metrics); + public RedisContext redisContext(RedisURI uri) { + RedisContext context = new RedisContext(); + context.cluster(cluster); + context.poolSize(poolSize); + context.protocolVersion(protocolVersion); + context.readFrom(readFrom.getReadFrom()); + context.uri(uriBuilder().uri(uri).build()); + return context; } public String getUsername() { @@ -134,19 +139,19 @@ public void setClientName(String clientName) { this.clientName = clientName; } - public boolean isMetrics() { - return metrics; + public ReadFrom getReadFrom() { + return readFrom; } - public void setMetrics(boolean metrics) { - this.metrics = metrics; + public void setReadFrom(ReadFrom readFrom) { + this.readFrom = readFrom; } @Override public String toString() { return "TargetRedisArgs [username=" + username + ", password=" + RiotUtils.mask(password) + ", timeout=" + timeout + ", tls=" + tls + ", insecure=" + insecure + ", clientName=" + clientName + ", cluster=" - + cluster + ", protocolVersion=" + protocolVersion + ", poolSize=" + poolSize + ", metrics=" + metrics + + cluster + ", protocolVersion=" + protocolVersion + ", poolSize=" + poolSize + ", readFrom=" + readFrom + "]"; } diff --git a/plugins/riot/src/test/java/com/redis/riot/RedisArgsTests.java b/plugins/riot/src/test/java/com/redis/riot/RedisArgsTests.java index 3a336adce..4ec7fd832 100644 --- a/plugins/riot/src/test/java/com/redis/riot/RedisArgsTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/RedisArgsTests.java @@ -10,12 +10,12 @@ class RedisArgsTests { @Test - void simpleRedisArgsURI() { + void redisArgsURI() { RedisArgs args = new RedisArgs(); RedisURI baseUri = RedisURI.create("redis://localhost"); args.setUri(baseUri); args.setClientName("ansdf"); - RedisURI uri = args.redisURI(); + RedisURI uri = args.uri(); Assertions.assertEquals(baseUri.getHost(), uri.getHost()); Assertions.assertEquals(baseUri.getPort(), uri.getPort()); Assertions.assertEquals(args.getClientName(), uri.getClientName());