Skip to content

Commit

Permalink
refactor: Simplified Redis connection options
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed Jul 6, 2022
1 parent c1720d7 commit a7bbedd
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ private TaskletStep scanStep() throws Exception {
List<ItemProcessor<KeyValue<byte[], ?>, KeyValue<byte[], ?>>> processors = new ArrayList<>();
processorOptions.getKeyProcessor().ifPresent(p -> {
EvaluationContext context = new StandardEvaluationContext();
context.setVariable("src", getRedisOptions().uris().get(0));
context.setVariable("dest", targetRedisOptions.uris().get(0));
context.setVariable("src", getRedisOptions().uri());
context.setVariable("dest", targetRedisOptions.uri());
processors.add(new KeyValueKeyProcessor<>(parser.parseExpression(p, new TemplateParserContext()), context));
});
processorOptions.getTtlProcessor().ifPresent(p -> processors
Expand All @@ -141,7 +141,7 @@ private TaskletStep liveReplicationStep() throws Exception {
RedisItemReader<byte[], ?> reader = flushingTransferOptions
.configure(reader().live().keyPatterns(readerOptions.getScanMatch())
.notificationQueueCapacity(replicationOptions.getNotificationQueueCapacity())
.database(getRedisOptions().uris().get(0).getDatabase()))
.database(getRedisOptions().uri().getDatabase()))
.build();
reader.setName("redis-live-reader");
return flushingTransferOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected RiotRedis app() {

private void configureReplicateCommand(CommandLine.ParseResult parseResult) {
ReplicateCommand command = parseResult.subcommand().commandSpec().commandLine().getCommand();
command.getTargetRedisOptions().setUris(new RedisURI[] { RedisURI.create(targetRedis.getRedisURI()) });
command.getTargetRedisOptions().setUri(RedisURI.create(targetRedis.getRedisURI()));
if (command.getReplicationOptions().getMode() == ReplicationMode.LIVE) {
command.getFlushingTransferOptions().setIdleTimeout(IDLE_TIMEOUT);
command.getReplicationOptions().setNotificationQueueCapacity(100000);
Expand Down
98 changes: 33 additions & 65 deletions core/riot-core/src/main/java/com/redis/riot/RedisOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
Expand All @@ -34,8 +32,6 @@ public class RedisOptions {

public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_PORT = 6379;
public static final int DEFAULT_DATABASE = 0;
public static final int DEFAULT_TIMEOUT = 60;

@Option(names = { "-h",
"--hostname" }, description = "Server hostname (default: ${DEFAULT-VALUE}).", paramLabel = "<host>")
Expand All @@ -50,18 +46,18 @@ public class RedisOptions {
@Option(names = { "-a",
"--pass" }, arity = "0..1", interactive = true, description = "Password to use when connecting to the server.", paramLabel = "<password>")
private Optional<char[]> password = Optional.empty();
@Option(names = { "-u", "--uri" }, arity = "1..*", description = "Server URI.", paramLabel = "<uri>")
private RedisURI[] uris;
@Option(names = { "-u", "--uri" }, description = "Server URI.", paramLabel = "<uri>")
private RedisURI uri;
@Option(names = "--timeout", description = "Redis command timeout (default: ${DEFAULT-VALUE}).", paramLabel = "<sec>")
private long timeout = DEFAULT_TIMEOUT;
private OptionalLong timeout = OptionalLong.empty();
@Option(names = { "-n", "--db" }, description = "Database number (default: ${DEFAULT-VALUE}).", paramLabel = "<db>")
private int database = DEFAULT_DATABASE;
private OptionalInt database = OptionalInt.empty();
@Option(names = { "-c", "--cluster" }, description = "Enable cluster mode.")
private boolean cluster;
@Option(names = "--tls", description = "Establish a secure TLS connection.")
private boolean tls;
private Optional<Boolean> tls = Optional.empty();
@Option(names = "--insecure", description = "Allow insecure TLS connection by skipping cert validation.")
private boolean verifyPeer = true;
private Optional<Boolean> insecure = Optional.empty();
@Option(names = "--ks", description = "Path to keystore.", paramLabel = "<file>")
private Optional<File> keystore = Optional.empty();
@Option(names = "--ks-password", arity = "0..1", interactive = true, description = "Keystore password.", paramLabel = "<pwd>")
Expand Down Expand Up @@ -113,40 +109,24 @@ public void setPassword(char[] password) {
this.password = Optional.of(password);
}

public void setUris(RedisURI[] uris) {
this.uris = uris;
}

public long getTimeout() {
return timeout;
public void setUri(RedisURI uri) {
this.uri = uri;
}

public void setTimeout(long timeout) {
this.timeout = timeout;
}

public int getDatabase() {
return database;
this.timeout = OptionalLong.of(timeout);
}

public void setDatabase(int database) {
this.database = database;
}

public boolean isTls() {
return tls;
this.database = OptionalInt.of(database);
}

public void setTls(boolean tls) {
this.tls = tls;
this.tls = Optional.of(tls);
}

public boolean isVerifyPeer() {
return verifyPeer;
}

public void setVerifyPeer(boolean verifyPeer) {
this.verifyPeer = verifyPeer;
public void setInsecure(boolean insecure) {
this.insecure = Optional.of(insecure);
}

public void setKeystore(File keystore) {
Expand Down Expand Up @@ -200,29 +180,17 @@ public void shutdown() {
}
}

public List<RedisURI> uris() {
List<RedisURI> redisURIs = new ArrayList<>();
if (ObjectUtils.isEmpty(uris)) {
RedisURI uri = RedisURI.create(host, port);
socket.ifPresent(uri::setSocket);
uri.setSsl(tls);
redisURIs.add(uri);
} else {
redisURIs.addAll(Arrays.asList(uris));
}
for (RedisURI uri : redisURIs) {
uri.setVerifyPeer(verifyPeer);
username.ifPresent(uri::setUsername);
password.ifPresent(uri::setPassword);
if (database != uri.getDatabase()) {
uri.setDatabase(database);
}
if (timeout != uri.getTimeout().getSeconds()) {
uri.setTimeout(Duration.ofSeconds(timeout));
}
clientName.ifPresent(uri::setClientName);
}
return redisURIs;
public RedisURI uri() {
RedisURI redisURI = uri == null ? RedisURI.create(host, port) : uri;
insecure.ifPresent(b -> redisURI.setVerifyPeer(!b));
tls.ifPresent(redisURI::setSsl);
socket.ifPresent(redisURI::setSocket);
username.ifPresent(redisURI::setUsername);
password.ifPresent(redisURI::setPassword);
database.ifPresent(redisURI::setDatabase);
timeout.ifPresent(t -> redisURI.setTimeout(Duration.ofSeconds(t)));
clientName.ifPresent(redisURI::setClientName);
return redisURI;
}

private ClientResources clientResources() {
Expand Down Expand Up @@ -276,7 +244,7 @@ public AbstractRedisClient client() {
public RedisModulesClusterClient redisModulesClusterClient() {
if (client == null) {
log.debug("Creating Redis cluster client: {}", this);
RedisModulesClusterClient clusterClient = RedisModulesClusterClient.create(clientResources(), uris());
RedisModulesClusterClient clusterClient = RedisModulesClusterClient.create(clientResources(), uri());
clusterClient.setOptions(
ClusterClientOptions.builder().autoReconnect(autoReconnect).sslOptions(sslOptions()).build());
this.client = clusterClient;
Expand All @@ -287,7 +255,7 @@ public RedisModulesClusterClient redisModulesClusterClient() {
public RedisModulesClient redisModulesClient() {
if (client == null) {
log.debug("Creating Redis client: {}", this);
RedisModulesClient redisClient = RedisModulesClient.create(clientResources(), uris().get(0));
RedisModulesClient redisClient = RedisModulesClient.create(clientResources(), uri());
redisClient
.setOptions(ClientOptions.builder().autoReconnect(autoReconnect).sslOptions(sslOptions()).build());
this.client = redisClient;
Expand All @@ -298,11 +266,11 @@ public RedisModulesClient redisModulesClient() {
@Override
public String toString() {
return "RedisOptions [host=" + host + ", port=" + port + ", socket=" + socket + ", username=" + username
+ ", password=" + password + ", uris=" + Arrays.toString(uris) + ", timeout=" + timeout + ", database="
+ database + ", cluster=" + cluster + ", tls=" + tls + ", verifyPeer=" + verifyPeer + ", keystore="
+ keystore + ", keystorePassword=" + keystorePassword + ", truststore=" + truststore
+ ", truststorePassword=" + truststorePassword + ", cert=" + cert + ", showMetrics=" + showMetrics
+ ", autoReconnect=" + autoReconnect + ", clientName=" + clientName + "]";
+ ", password=" + password + ", uri=" + uri + ", timeout=" + timeout + ", database=" + database
+ ", cluster=" + cluster + ", tls=" + tls + ", insecure=" + insecure + ", keystore=" + keystore
+ ", keystorePassword=" + keystorePassword + ", truststore=" + truststore + ", truststorePassword="
+ truststorePassword + ", cert=" + cert + ", showMetrics=" + showMetrics + ", autoReconnect="
+ autoReconnect + ", clientName=" + clientName + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected int execute(String filename, RedisTestContext redis, Consumer<CommandL

protected void configure(RiotApp app, RedisTestContext redis) {
app.getLoggingOptions().setStacktrace(true);
app.getRedisOptions().setUris(new RedisURI[] { RedisURI.create(redis.getRedisURI()) });
app.getRedisOptions().setUri(RedisURI.create(redis.getRedisURI()));
app.getRedisOptions().setCluster(redis.isCluster());
}

Expand Down

0 comments on commit a7bbedd

Please sign in to comment.