Skip to content

Commit

Permalink
refactor!: Moved redis args
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 25, 2024
1 parent c35f242 commit 355c087
Show file tree
Hide file tree
Showing 82 changed files with 1,677 additions and 1,400 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.3
4.0.0-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,22 @@ public class FakerItemReader extends AbstractItemCountingItemStreamItemReader<Ma
private Locale locale = DEFAULT_LOCALE;
private StandardEvaluationContext evaluationContext;

private int maxItemCount;

public FakerItemReader() {
setName(ClassUtils.getShortName(getClass()));
}

public int getMaxItemCount() {
return maxItemCount;
}

@Override
public void setMaxItemCount(int count) {
super.setMaxItemCount(count);
this.maxItemCount = count;
}

public void setLocale(Locale locale) {
this.locale = locale;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ public void setLineSeparator(String lineSeparator) {
this.lineSeparator = lineSeparator;
}

@Override
protected boolean isStruct() {
return true;
}

private ItemWriter<KeyValue<String, Object>> writer() {
WritableResource resource;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.junit.jupiter.api.TestInfo;

import com.amazonaws.util.IOUtils;
import com.redis.riot.core.RedisClientOptions;
import com.redis.riot.core.operation.HsetBuilder;
import com.redis.spring.batch.test.AbstractTestBase;

Expand All @@ -31,7 +30,8 @@ abstract class AbstractFileTests extends AbstractTestBase {
@Test
void fileImportJSON(TestInfo info) throws Exception {
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.getRedisClientOptions().setRedisURI(redisURI);
executable.getRedisClientOptions().setCluster(getRedisServer().isRedisCluster());
executable.setFiles(BEERS_JSON_URL);
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
Expand All @@ -52,18 +52,12 @@ void fileImportJSON(TestInfo info) throws Exception {
Assertions.assertEquals("Hocus Pocus", beer1.get("name"));
}

private RedisClientOptions redisClientOptions() {
RedisClientOptions options = new RedisClientOptions();
options.setCluster(getRedisServer().isRedisCluster());
options.setUri(getRedisServer().getRedisURI());
return options;
}

@SuppressWarnings("unchecked")
@Test
void fileApiImportCSV(TestInfo info) throws Exception {
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.getRedisClientOptions().setRedisURI(redisURI);
executable.getRedisClientOptions().setCluster(getRedisServer().isRedisCluster());
executable.setFiles("https://storage.googleapis.com/jrx/beers.csv");
executable.setHeader(true);
executable.setName(name(info));
Expand Down Expand Up @@ -92,7 +86,8 @@ void fileApiFileExpansion(TestInfo info) throws Exception {
File file2 = temp.resolve("beers2.csv").toFile();
IOUtils.copy(getClass().getClassLoader().getResourceAsStream("beers2.csv"), new FileOutputStream(file2));
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.getRedisClientOptions().setRedisURI(redisURI);
executable.getRedisClientOptions().setCluster(getRedisServer().isRedisCluster());
executable.setFiles(temp.resolve("*.csv").toFile().getPath());
executable.setHeader(true);
executable.setName(name(info));
Expand All @@ -116,7 +111,8 @@ void fileApiFileExpansion(TestInfo info) throws Exception {
@Test
void fileImportCSVMultiThreaded(TestInfo info) throws Exception {
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.getRedisClientOptions().setRedisURI(redisURI);
executable.getRedisClientOptions().setCluster(getRedisServer().isRedisCluster());
executable.setFiles("https://storage.googleapis.com/jrx/beers.csv");
executable.setHeader(true);
executable.setThreads(3);
Expand All @@ -141,7 +137,8 @@ void fileImportCSVMultiThreaded(TestInfo info) throws Exception {
@Test
void fileImportJSONL(TestInfo info) throws Exception {
try (FileImport executable = new FileImport()) {
executable.setRedisClientOptions(redisClientOptions());
executable.getRedisClientOptions().setRedisURI(redisURI);
executable.getRedisClientOptions().setCluster(getRedisServer().isRedisCluster());
executable.setFiles(BEERS_JSONL_URL);
HsetBuilder hsetBuilder = new HsetBuilder();
hsetBuilder.setKeyspace(KEYSPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.codec.ByteArrayCodec;

public class Replication extends AbstractExport {
Expand Down Expand Up @@ -74,26 +73,19 @@ public class Replication extends AbstractExport {
private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT;
private int notificationQueueCapacity = DEFAULT_NOTIFICATION_QUEUE_CAPACITY;

private RedisURI targetRedisURI;
private AbstractRedisClient targetRedisClient;

@Override
protected boolean isStruct() {
return type == ReplicationType.STRUCT;
}

@Override
public void afterPropertiesSet() throws Exception {
targetRedisURI = targetRedisClientOptions.redisURI();
targetRedisClient = targetRedisClientOptions.client(targetRedisURI);
targetRedisClient = targetRedisClientOptions.redisClient();
super.afterPropertiesSet();
}

@Override
protected StandardEvaluationContext evaluationContext() {
StandardEvaluationContext context = super.evaluationContext();
context.setVariable(SOURCE_VAR, redisURI);
context.setVariable(TARGET_VAR, targetRedisURI);
context.setVariable(SOURCE_VAR, getRedisClientOptions().getRedisURI());
context.setVariable(TARGET_VAR, targetRedisClientOptions.getRedisURI());
return context;
}

Expand Down Expand Up @@ -139,7 +131,7 @@ protected Job job() {
SimpleFlow replicateFlow = flow(FLOW_REPLICATE).split(new SimpleAsyncTaskExecutor()).add(liveFlow, scanFlow)
.build();
JobFlowBuilder live = jobBuilder().start(replicateFlow);
if (shouldCompare()) {
if (shouldCompare() && processor == null) {
live.next(compareStep);
}
return live.build().build();
Expand All @@ -148,7 +140,7 @@ protected Job job() {
return jobBuilder().start(liveStep.build()).build();
case SNAPSHOT:
SimpleJobBuilder snapshot = jobBuilder().start(scanStep.build());
if (shouldCompare()) {
if (shouldCompare() && processor == null) {
snapshot.next(compareStep);
}
return snapshot.build();
Expand All @@ -174,7 +166,7 @@ private FlowBuilder<SimpleFlow> flow(String name) {
}

private boolean shouldCompare() {
return compareMode != CompareMode.NONE && !isDryRun() && getProcessorOptions().isEmpty();
return compareMode != CompareMode.NONE && !isDryRun();
}

@Override
Expand Down Expand Up @@ -212,7 +204,7 @@ protected <K, V, T> void configure(RedisItemReader<K, V, T> reader) {

@SuppressWarnings({ "unchecked", "rawtypes" })
private RedisItemReader<byte[], byte[], KeyValue<byte[], Object>> reader() {
if (isStruct()) {
if (type == ReplicationType.STRUCT) {
return RedisItemReader.struct(ByteArrayCodec.INSTANCE);
}
return (RedisItemReader) RedisItemReader.dump();
Expand Down Expand Up @@ -252,7 +244,7 @@ protected <K, V, T> void configure(RedisItemWriter<K, V, T> writer) {
}

private RedisItemWriter<byte[], byte[], ? extends KeyValue<byte[], ?>> writer() {
if (isStruct()) {
if (type == ReplicationType.STRUCT) {
return RedisItemWriter.struct(ByteArrayCodec.INSTANCE);
}
return RedisItemWriter.dump();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.ExportProcessorOptions;
import com.redis.riot.core.KeyValueProcessorOptions;
import com.redis.riot.core.PredicateItemProcessor;
import com.redis.riot.core.RedisClientOptions;
import com.redis.riot.core.RiotUtils;
Expand All @@ -26,6 +26,7 @@
import com.redis.spring.batch.util.Predicates;
import com.redis.testcontainers.RedisServer;

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.codec.ByteArrayCodec;

Expand Down Expand Up @@ -62,7 +63,7 @@ protected void execute(Replication replication, TestInfo info) throws Exception

private RedisClientOptions redisOptions(RedisServer redis) {
RedisClientOptions options = new RedisClientOptions();
options.setUri(redis.getRedisURI());
options.setRedisURI(RedisURI.create(redis.getRedisURI()));
options.setCluster(redis.isRedisCluster());
return options;
}
Expand All @@ -88,8 +89,8 @@ void keyProcessor(TestInfo info) throws Throwable {
Assertions.assertEquals(value1, targetRedisCommands.get("string:" + key1));
}

private ExportProcessorOptions processorOptions(String keyExpression) {
ExportProcessorOptions options = new ExportProcessorOptions();
private KeyValueProcessorOptions processorOptions(String keyExpression) {
KeyValueProcessorOptions options = new KeyValueProcessorOptions();
options.setKeyExpression(RiotUtils.parseTemplate(keyExpression));
return options;
}
Expand Down
23 changes: 21 additions & 2 deletions core/riot-core/riot-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies {
api 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
implementation 'org.springframework.boot:spring-boot-autoconfigure'
implementation 'org.hsqldb:hsqldb'
implementation 'org.apache.commons:commons-pool2'
implementation 'org.apache.commons:commons-pool2'
testImplementation 'org.awaitility:awaitility'
}

Expand All @@ -37,4 +37,23 @@ if (!(project.findProperty('automatic.module.name.skip') ?: false).toBoolean())
attributes('Automatic-Module-Name': project.findProperty('automatic.module.name'))
}
}
}
}

project.rootProject.gradle.addBuildListener(new BuildAdapter() {
@Override
void projectsEvaluated(Gradle gradle) {
gradle.rootProject.subprojects
.find { p -> p.name == 'riot-core' }
.processResources {
inputs.property('build_date', gradle.rootProject.config.buildInfo.buildDate + ':' + gradle.rootProject.config.buildInfo.buildTime)
filesMatching(['**/RiotVersion.properties']) {
expand(
'riot_version': gradle.rootProject.version,
'build_date': gradle.rootProject.config.buildInfo.buildDate,
'build_time': gradle.rootProject.config.buildInfo.buildTime,
'build_revision': gradle.rootProject.config.buildInfo.buildRevision
)
}
}
}
})
100 changes: 6 additions & 94 deletions core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java
Original file line number Diff line number Diff line change
@@ -1,108 +1,24 @@
package com.redis.riot.core;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.springframework.batch.item.ItemProcessor;

import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.util.CollectionUtils;

import com.redis.riot.core.function.CompositeOperator;
import com.redis.riot.core.function.DropStreamMessageId;
import com.redis.riot.core.function.ExpressionFunction;
import com.redis.riot.core.function.LongExpressionFunction;
import com.redis.riot.core.function.StringKeyValueFunction;
import com.redis.riot.core.function.ToStringKeyValueFunction;
import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.util.BatchUtils;
import com.redis.spring.batch.util.Predicates;

import io.lettuce.core.codec.RedisCodec;

public abstract class AbstractExport extends AbstractRedisCallable {

private RedisReaderOptions readerOptions = new RedisReaderOptions();
private KeyFilterOptions keyFilterOptions = new KeyFilterOptions();
private ExportProcessorOptions processorOptions = new ExportProcessorOptions();

protected <K> FunctionItemProcessor<KeyValue<K, Object>, KeyValue<K, Object>> processor(RedisCodec<K, ?> codec) {
if (processorOptions.isEmpty()) {
return null;
}
Function<KeyValue<K, Object>, KeyValue<String, Object>> code = new ToStringKeyValueFunction<>(codec);
Function<KeyValue<String, Object>, KeyValue<K, Object>> decode = new StringKeyValueFunction<>(codec);
CompositeOperator<KeyValue<String, Object>> operator = new CompositeOperator<>(processorConsumers());
return new FunctionItemProcessor<>(code.andThen(operator).andThen(decode));
}

private List<Consumer<KeyValue<String, Object>>> processorConsumers() {
List<Consumer<KeyValue<String, Object>>> consumers = new ArrayList<>();
if (processorOptions.getKeyExpression() != null) {
ExpressionFunction<Object, String> function = expressionFunction(
processorOptions.getKeyExpression().getExpression());
consumers.add(t -> t.setKey(function.apply(t)));
}
if (processorOptions.isDropTtl()) {
consumers.add(t -> t.setTtl(0));
}
if (processorOptions.getTtlExpression() != null) {
LongExpressionFunction<Object> function = longExpressionFunction(processorOptions.getTtlExpression());
consumers.add(t -> t.setTtl(function.applyAsLong(t)));
}
if (processorOptions.isDropStreamMessageId() && isStruct()) {
consumers.add(new DropStreamMessageId());
}
if (processorOptions.getTypeExpression() != null) {
ExpressionFunction<KeyValue<String, Object>, String> function = expressionFunction(
processorOptions.getTypeExpression());
consumers.add(t -> t.setType(function.apply(t)));
}
return consumers;
}

protected abstract boolean isStruct();
private KeyValueProcessorOptions processorOptions = new KeyValueProcessorOptions();

@Override
protected <K, V, T> void configure(RedisItemReader<K, V, T> reader) {
reader.setJobFactory(getJobFactory());
reader.setDatabase(redisURI.getDatabase());
if (!keyFilterOptions.isEmpty()) {
Predicate<K> predicate = keyFilterPredicate(reader.getCodec());
reader.setKeyProcessor(new PredicateItemProcessor<>(predicate));
}
readerOptions.configure(reader);
super.configure(reader);
}

public <K> Predicate<K> keyFilterPredicate(RedisCodec<K, ?> codec) {
return slotsPredicate(codec).and(globPredicate(codec));
}

private <K> Predicate<K> slotsPredicate(RedisCodec<K, ?> codec) {
if (CollectionUtils.isEmpty(keyFilterOptions.getSlots())) {
return Predicates.isTrue();
}
Stream<Predicate<K>> predicates = keyFilterOptions.getSlots().stream()
.map(r -> Predicates.slotRange(codec, r.getStart(), r.getEnd()));
return Predicates.or(predicates);
}

private <K> Predicate<K> globPredicate(RedisCodec<K, ?> codec) {
return Predicates.map(BatchUtils.toStringKeyFunction(codec), globPredicate());
}

private Predicate<String> globPredicate() {
Predicate<String> include = RiotUtils.globPredicate(keyFilterOptions.getIncludes());
if (CollectionUtils.isEmpty(keyFilterOptions.getExcludes())) {
return include;
}
return include.and(RiotUtils.globPredicate(keyFilterOptions.getExcludes()).negate());
}

public RedisReaderOptions getReaderOptions() {
return readerOptions;
}
Expand All @@ -111,20 +27,16 @@ public void setReaderOptions(RedisReaderOptions options) {
this.readerOptions = options;
}

public ExportProcessorOptions getProcessorOptions() {
public KeyValueProcessorOptions getProcessorOptions() {
return processorOptions;
}

public void setProcessorOptions(ExportProcessorOptions options) {
public void setProcessorOptions(KeyValueProcessorOptions options) {
this.processorOptions = options;
}

public KeyFilterOptions getKeyFilterOptions() {
return keyFilterOptions;
}

public void setKeyFilterOptions(KeyFilterOptions options) {
this.keyFilterOptions = options;
public <K> ItemProcessor<KeyValue<K, Object>, KeyValue<K, Object>> processor(RedisCodec<K, ?> codec) {
return processorOptions.processor(getEvaluationContext(), codec);
}

}
Loading

0 comments on commit 355c087

Please sign in to comment.