Skip to content

Commit

Permalink
refactor: Moved Redis options to context
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Oct 26, 2024
1 parent d6688ef commit bcc8b0d
Show file tree
Hide file tree
Showing 16 changed files with 269 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public abstract class AbstractExportCommand extends AbstractJobCommand {
@Override
protected void execute() throws Exception {
sourceRedisContext = sourceRedisContext();
sourceRedisContext.afterPropertiesSet();
try {
super.execute();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.springframework.util.CollectionUtils;

import com.redis.riot.core.AbstractJobCommand;
import com.redis.riot.core.Expression;
import com.redis.riot.core.QuietMapAccessor;
import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.Step;
Expand Down Expand Up @@ -87,6 +86,7 @@ protected Step<Map<String, Object>, Map<String, Object>> step(ItemReader<Map<Str
@Override
protected void execute() throws Exception {
targetRedisContext = targetRedisContext();
targetRedisContext.afterPropertiesSet();
try {
super.execute();
} finally {
Expand Down Expand Up @@ -127,24 +127,6 @@ protected void configureTargetRedisWriter(RedisItemWriter<?, ?, ?> writer) {
targetRedisWriterArgs.configure(writer);
}

static class ExpressionProcessor implements ItemProcessor<Map<String, Object>, Map<String, Object>> {

private final EvaluationContext context;
private final Map<String, Expression> expressions;

public ExpressionProcessor(EvaluationContext context, Map<String, Expression> expressions) {
this.context = context;
this.expressions = expressions;
}

@Override
public Map<String, Object> process(Map<String, Object> item) throws Exception {
expressions.forEach((k, v) -> item.put(k, v.getValue(context, item)));
return item;
}

}

public RedisWriterArgs getTargetRedisWriterArgs() {
return targetRedisWriterArgs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public abstract class AbstractRedisCommand extends AbstractJobCommand {
@Override
protected void execute() throws Exception {
redisContext = redisArgs.redisContext();
redisContext.afterPropertiesSet();
try {
super.execute();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

import com.redis.riot.core.processor.RegexNamedGroupFunction;
import com.redis.riot.function.KeyValueMap;
import com.redis.spring.batch.item.redis.RedisItemReader;
import com.redis.spring.batch.item.redis.RedisItemWriter;
import com.redis.spring.batch.item.redis.common.KeyValue;

import picocli.CommandLine.ArgGroup;
Expand All @@ -20,9 +18,6 @@ public abstract class AbstractRedisExportCommand extends AbstractExportCommand {
@ArgGroup(exclusive = false)
private RedisArgs redisArgs = new RedisArgs();

@Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE;

@Option(names = "--key-regex", description = "Regex for key-field extraction, e.g. '\\w+:(?<id>.+)' extracts an id field from the key", paramLabel = "<rex>")
private Pattern keyRegex;

Expand All @@ -31,13 +26,6 @@ protected RedisContext sourceRedisContext() {
return redisArgs.redisContext();
}

@Override
protected void configureSourceRedisReader(RedisItemReader<?, ?> reader) {
super.configureSourceRedisReader(reader);
log.info("Configuring Redis reader with poolSize {}", poolSize);
reader.setPoolSize(poolSize);
}

protected ItemProcessor<KeyValue<String>, Map<String, Object>> mapProcessor() {
KeyValueMap mapFunction = new KeyValueMap();
if (keyRegex != null) {
Expand All @@ -54,14 +42,6 @@ public void setRedisArgs(RedisArgs clientArgs) {
this.redisArgs = clientArgs;
}

public int getPoolSize() {
return poolSize;
}

public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}

public Pattern getKeyRegex() {
return keyRegex;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
package com.redis.riot;

import com.redis.spring.batch.item.redis.RedisItemWriter;

import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Option;

public abstract class AbstractRedisImportCommand extends AbstractImportCommand {

@ArgGroup(exclusive = false)
private RedisArgs redisArgs = new RedisArgs();

@Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE;

@Override
protected void configureTargetRedisWriter(RedisItemWriter<?, ?, ?> writer) {
super.configureTargetRedisWriter(writer);
log.info("Configuring Redis writer with poolSize {}", poolSize);
writer.setPoolSize(poolSize);
}

@Override
protected RedisContext targetRedisContext() {
return redisArgs.redisContext();
Expand All @@ -33,12 +20,4 @@ public void setRedisArgs(RedisArgs clientArgs) {
this.redisArgs = clientArgs;
}

public int getPoolSize() {
return poolSize;
}

public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public abstract class AbstractRedisTargetExportCommand extends AbstractExportCom
@Override
protected void execute() throws Exception {
targetRedisContext = targetRedisContext();
targetRedisContext.afterPropertiesSet();
try {
super.execute();
} finally {
Expand All @@ -44,12 +45,16 @@ protected void execute() throws Exception {
@Override
protected RedisContext sourceRedisContext() {
log.info("Creating source Redis context with {} {} {}", sourceRedisUri, sourceRedisArgs, sslArgs);
return sourceRedisArgs.redisContext(sourceRedisUri, sslArgs);
RedisContext context = sourceRedisArgs.redisContext(sourceRedisUri);
context.sslOptions(sslArgs.sslOptions());
return context;
}

private RedisContext targetRedisContext() {
log.info("Creating target Redis context with {} {} {}", targetRedisUri, targetRedisArgs, sslArgs);
return targetRedisArgs.redisContext(targetRedisUri, sslArgs);
RedisContext context = targetRedisArgs.redisContext(targetRedisUri);
context.sslOptions(sslArgs.sslOptions());
return context;
}

@Override
Expand All @@ -58,24 +63,13 @@ protected void configure(StandardEvaluationContext context) {
context.setVariable(VAR_TARGET, targetRedisContext.getConnection().sync());
}

@Override
protected void configureSourceRedisReader(RedisItemReader<?, ?> reader) {
super.configureSourceRedisReader(reader);
log.info("Configuring source Redis reader with poolSize {}", sourceRedisArgs.getPoolSize());
reader.setPoolSize(sourceRedisArgs.getPoolSize());
}

protected void configureTargetRedisReader(RedisItemReader<?, ?> reader) {
configureAsyncReader(reader);
targetRedisContext.configure(reader);
log.info("Configuring target Redis reader with poolSize {}", targetRedisArgs.getPoolSize());
reader.setPoolSize(targetRedisArgs.getPoolSize());
}

protected void configureTargetRedisWriter(RedisItemWriter<?, ?, ?> writer) {
targetRedisContext.configure(writer);
log.info("Configuring target Redis writer with poolSize {}", targetRedisArgs.getPoolSize());
writer.setPoolSize(targetRedisArgs.getPoolSize());
}

public RedisURI getSourceRedisUri() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ public abstract class AbstractReplicateCommand extends AbstractRedisTargetExport
private static final String COMPARE_TASK_NAME = "Comparing";
private static final String STATUS_DELIMITER = " | ";

@Option(names = "--target-read-from", description = "Which target Redis cluster nodes to read from: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<n>")
private ReadFrom targetReadFrom = ReadFrom.UPSTREAM;

@Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification. Disables progress reporting.")
private boolean showDiffs;

Expand Down Expand Up @@ -147,21 +144,6 @@ private RedisItemReader<byte[], byte[]> compareTargetReader() {
return reader;
}

@Override
protected void configureTargetRedisReader(RedisItemReader<?, ?> reader) {
super.configureTargetRedisReader(reader);
log.info("Configuring target Redis reader with read-from {}", targetReadFrom);
reader.setReadFrom(targetReadFrom.getReadFrom());
}

public ReadFrom getTargetReadFrom() {
return targetReadFrom;
}

public void setTargetReadFrom(ReadFrom readFrom) {
this.targetReadFrom = readFrom;
}

public boolean isShowDiffs() {
return showDiffs;
}
Expand Down
26 changes: 26 additions & 0 deletions plugins/riot/src/main/java/com/redis/riot/ExpressionProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.redis.riot;

import java.util.Map;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.expression.EvaluationContext;

import com.redis.riot.core.Expression;

public class ExpressionProcessor implements ItemProcessor<Map<String, Object>, Map<String, Object>> {

private final EvaluationContext context;
private final Map<String, Expression> expressions;

public ExpressionProcessor(EvaluationContext context, Map<String, Expression> expressions) {
this.context = context;
this.expressions = expressions;
}

@Override
public Map<String, Object> process(Map<String, Object> item) throws Exception {
expressions.forEach((k, v) -> item.put(k, v.getValue(context, item)));
return item;
}

}
14 changes: 0 additions & 14 deletions plugins/riot/src/main/java/com/redis/riot/Generate.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

@Command(name = "generate", description = "Generate Redis data structures.")
public class Generate extends AbstractRedisCommand {
Expand All @@ -26,9 +25,6 @@ public class Generate extends AbstractRedisCommand {
@ArgGroup(exclusive = false)
private GenerateArgs generateArgs = new GenerateArgs();

@Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE;

@ArgGroup(exclusive = false, heading = "Redis writer options%n")
private RedisWriterArgs redisWriterArgs = new RedisWriterArgs();

Expand All @@ -48,8 +44,6 @@ private RedisItemWriter<String, String, KeyValue<String>> writer() {
configure(writer);
log.info("Configuring Redis writer with {}", redisWriterArgs);
redisWriterArgs.configure(writer);
log.info("Configuring Redis writer with poolSize {}", poolSize);
writer.setPoolSize(poolSize);
return writer;
}

Expand Down Expand Up @@ -119,12 +113,4 @@ public void setGenerateArgs(GenerateArgs args) {
this.generateArgs = args;
}

public int getPoolSize() {
return poolSize;
}

public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}

}
45 changes: 32 additions & 13 deletions plugins/riot/src/main/java/com/redis/riot/RedisArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.redis.lettucemod.RedisURIBuilder;
import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.RiotVersion;
import com.redis.spring.batch.item.redis.RedisItemReader;

import io.lettuce.core.RedisURI;
import io.lettuce.core.SslVerifyMode;
Expand Down Expand Up @@ -59,10 +60,24 @@ public class RedisArgs {
@ArgGroup(exclusive = false, heading = "TLS options%n")
private SslArgs sslArgs = new SslArgs();

@Option(names = "--command-metrics", description = "Enable Lettuce command metrics", hidden = true)
private boolean metrics;
@Option(names = "--pool", description = "Max number of Redis connections (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int poolSize = RedisItemReader.DEFAULT_POOL_SIZE;

public RedisURI redisURI() {
@Option(names = "--read-from", description = "Which Redis cluster nodes to read from: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<name>")
private ReadFrom readFrom = ReadFrom.UPSTREAM;

public RedisContext redisContext() {
RedisContext context = new RedisContext();
context.cluster(cluster);
context.poolSize(poolSize);
context.protocolVersion(protocolVersion);
context.readFrom(readFrom.getReadFrom());
context.sslOptions(sslArgs.sslOptions());
context.uri(uri());
return context;
}

public RedisURI uri() {
RedisURIBuilder builder = new RedisURIBuilder();
builder.clientName(clientName);
builder.database(database);
Expand Down Expand Up @@ -192,25 +207,29 @@ public void setClientName(String clientName) {
this.clientName = clientName;
}

public boolean isMetrics() {
return metrics;
public int getPoolSize() {
return poolSize;
}

public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}

public void setMetrics(boolean metrics) {
this.metrics = metrics;
public ReadFrom getReadFrom() {
return readFrom;
}

public void setReadFrom(ReadFrom readFrom) {
this.readFrom = readFrom;
}

@Override
public String toString() {
return "SimpleRedisArgs [uri=" + uri + ", host=" + host + ", port=" + port + ", socket=" + socket
+ ", username=" + username + ", password=" + RiotUtils.mask(password) + ", timeout=" + timeout
+ ", database=" + database + ", tls=" + tls + ", insecure=" + insecure + ", clientName=" + clientName
+ ", cluster=" + cluster + ", protocolVersion=" + protocolVersion + ", sslArgs=" + sslArgs
+ ", metrics=" + metrics + "]";
}

public RedisContext redisContext() {
return RedisContext.create(redisURI(), cluster, protocolVersion, sslArgs, metrics);
+ ", cluster=" + cluster + ", poolSize=" + poolSize + ", protocolVersion=" + protocolVersion
+ ", readFrom=" + readFrom + ", sslArgs=" + sslArgs + "]";
}

}
Loading

0 comments on commit bcc8b0d

Please sign in to comment.