diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java index a04d56ac8..76448ea33 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java @@ -9,6 +9,7 @@ import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.codec.StringCodec; public abstract class AbstractExport extends AbstractJobExecutable { @@ -26,6 +27,10 @@ public void setRedisReaderOptions(RedisReaderOptions options) { this.redisReaderOptions = options; } + protected RedisItemReader reader() { + return reader(StringCodec.UTF8); + } + protected RedisItemReader reader(RedisCodec codec) { return reader(client, codec, redisReaderOptions); } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobExecutable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobExecutable.java index 665c14a18..2917b48fc 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobExecutable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobExecutable.java @@ -1,15 +1,19 @@ package com.redis.riot.core; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; @@ -35,11 +39,17 @@ public abstract class AbstractJobExecutable implements Executable { private String name; + private List>> stepConsumers = new ArrayList<>(); + protected AbstractJobExecutable(AbstractRedisClient client) { setName(ClassUtils.getShortName(getClass())); this.client = client; } + public void addStepConsumer(Consumer> consumer) { + stepConsumers.add(consumer); + } + public String getName() { return name; } @@ -64,17 +74,13 @@ public void execute() { throw new RiotExecutionException("Could not initialize job repository", e); } jobFactory = new JobBuilderFactory(jobRepository); - stepFactory = new StepBuilderFactory(jobRepository, new ResourcelessTransactionManager()); - Job job = job(); - SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); - jobLauncher.setJobRepository(jobRepository); - jobLauncher.setTaskExecutor(new SyncTaskExecutor()); + stepFactory = stepBuilderFactory(); JobExecution execution; try { - execution = jobLauncher.run(job, new JobParameters()); + execution = jobLauncher().run(job(), new JobParameters()); } catch (JobExecutionException e) { // Should not happen but handle anyway - throw new RiotExecutionException(MessageFormat.format("Could not run job {0}", job.getName()), e); + throw new RiotExecutionException("Could not run job", e); } if (execution.getStatus().isUnsuccessful()) { List exceptions = execution.getAllFailureExceptions(); @@ -86,6 +92,17 @@ public void execute() { } } + private StepBuilderFactory stepBuilderFactory() { + return new StepBuilderFactory(jobRepository, new ResourcelessTransactionManager()); + } + + private JobLauncher jobLauncher() { + SimpleJobLauncher launcher = new SimpleJobLauncher(); + launcher.setJobRepository(jobRepository); + launcher.setTaskExecutor(new SyncTaskExecutor()); + return launcher; + } + protected JobBuilder jobBuilder() { return jobFactory.get(name); } @@ -96,4 +113,9 @@ protected ReaderStepBuilder step(String name) { return StepBuilder.factory(stepFactory).name(name).options(stepOptions); } + protected Step build(StepBuilder step) { + stepConsumers.forEach(c -> c.accept(step)); + return step.build().build(); + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java b/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java index 2c8285871..38ed430f6 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java @@ -4,6 +4,7 @@ import org.springframework.batch.core.Job; +import com.redis.spring.batch.KeyValue; import com.redis.spring.batch.gen.CollectionOptions; import com.redis.spring.batch.gen.DataType; import com.redis.spring.batch.gen.GeneratorItemReader; @@ -156,7 +157,8 @@ public void setTypes(List types) { @Override protected Job job() { - return jobBuilder().start(step(getName()).reader(reader()).writer(writer()).build().build()).build(); + StepBuilder, KeyValue> step = step(getName()).reader(reader()).writer(writer()); + return jobBuilder().start(build(step)).build(); } private GeneratorItemReader reader() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java index 195dfe7b1..e58ce26c7 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java @@ -1,5 +1,6 @@ package com.redis.riot.core; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -7,6 +8,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import org.springframework.batch.core.ItemWriteListener; +import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.core.step.builder.SimpleStepBuilder; @@ -22,6 +25,7 @@ import org.springframework.classify.BinaryExceptionClassifier; import com.redis.spring.batch.RedisItemReader; +import com.redis.spring.batch.step.FlushingStepBuilder; import com.redis.spring.batch.util.BatchUtils; import io.lettuce.core.RedisException; @@ -38,10 +42,16 @@ public class StepBuilder { private ItemProcessor processor; - private List listeners = new ArrayList<>(); + private List executionListeners = new ArrayList<>(); + + private List> writeListeners = new ArrayList<>(); private StepOptions options = new StepOptions(); + private Duration flushingInterval; + + private Duration idleTimeout; + private Collection> skippableExceptions = new ArrayList<>(); private Collection> retriableExceptions = defaultRetriableExceptions(); @@ -61,6 +71,14 @@ private StepBuilder(StepBuilderFactory factory, String name, ItemReader reade this.writer = writer; } + public String getName() { + return name; + } + + public ItemReader getReader() { + return reader; + } + @SuppressWarnings("unchecked") public StepBuilder skippableExceptions(Class... exceptions) { this.skippableExceptions = Arrays.asList(exceptions); @@ -78,9 +96,12 @@ public StepBuilder options(StepOptions options) { return this; } - public StepBuilder listeners(Object... listeners) { - this.listeners = Arrays.asList(listeners); - return this; + public void addWriteListener(ItemWriteListener listener) { + this.writeListeners.add(listener); + } + + public void addExecutionListener(StepExecutionListener listener) { + this.executionListeners.add(listener); } public StepBuilder processor(ItemProcessor processor) { @@ -88,6 +109,16 @@ public StepBuilder processor(ItemProcessor processor) { return this; } + public StepBuilder flushingInterval(Duration interval) { + this.flushingInterval = interval; + return this; + } + + public StepBuilder idleTimeout(Duration timeout) { + this.idleTimeout = timeout; + return this; + } + public SimpleStepBuilder build() { SimpleStepBuilder step = factory.get(name).chunk(options.getChunkSize()); step.reader(reader()); @@ -97,17 +128,22 @@ public SimpleStepBuilder build() { step.taskExecutor(BatchUtils.threadPoolTaskExecutor(options.getThreads())); step.throttleLimit(options.getThreads()); } - listeners.forEach(step::listener); + executionListeners.forEach(step::listener); + writeListeners.forEach(step::listener); + if (BatchUtils.isPositive(flushingInterval)) { + step = new FlushingStepBuilder<>(step).interval(flushingInterval).idleTimeout(idleTimeout); + } if (options.isFaultTolerance()) { - FaultTolerantStepBuilder ftStep = step.faultTolerant(); - ftStep.skipPolicy(skipPolicy()); - ftStep.retryLimit(options.getRetryLimit()); - retriableExceptions.forEach(ftStep::retry); - return ftStep; + step = retry(step.faultTolerant().skipPolicy(skipPolicy()).retryLimit(options.getRetryLimit())); } return step; } + private FaultTolerantStepBuilder retry(FaultTolerantStepBuilder step) { + retriableExceptions.forEach(step::retry); + return step; + } + private ItemReader reader() { if (reader instanceof RedisItemReader) { return reader; diff --git a/core/riot-core/src/main/java/com/redis/riot/core/db/DatabaseExport.java b/core/riot-core/src/main/java/com/redis/riot/core/db/DatabaseExport.java index f1872864b..936148402 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/db/DatabaseExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/db/DatabaseExport.java @@ -70,7 +70,7 @@ protected Job job() { StepBuilder, Map> step = step(getName()).reader(reader(StringCodec.UTF8)) .writer(writer()); step.processor(processor()); - return jobBuilder().start(step.build().build()).build(); + return jobBuilder().start(build(step)).build(); } private JdbcBatchItemWriter> writer() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/db/DatabaseImport.java b/core/riot-core/src/main/java/com/redis/riot/core/db/DatabaseImport.java index de92b4bb5..4094702a3 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/db/DatabaseImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/db/DatabaseImport.java @@ -13,6 +13,7 @@ import org.springframework.jdbc.core.ColumnMapRowMapper; import com.redis.riot.core.AbstractMapImport; +import com.redis.riot.core.StepBuilder; import io.lettuce.core.AbstractRedisClient; @@ -103,7 +104,8 @@ public void setVerifyCursorPosition(boolean verifyCursorPosition) { @Override protected Job job() { - return jobBuilder().start(step(getName()).reader(reader()).writer(writer()).build().build()).build(); + StepBuilder, Map> step = step(getName()).reader(reader()).writer(writer()); + return jobBuilder().start(build(step)).build(); } private ItemReader> reader() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/faker/FakerImport.java b/core/riot-core/src/main/java/com/redis/riot/core/faker/FakerImport.java index e4fb3116a..abd6c67f9 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/faker/FakerImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/faker/FakerImport.java @@ -15,6 +15,7 @@ import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.core.AbstractMapImport; import com.redis.riot.core.SpelUtils; +import com.redis.riot.core.StepBuilder; import com.redis.spring.batch.util.IntRange; import io.lettuce.core.AbstractRedisClient; @@ -58,7 +59,8 @@ public void setLocale(Locale locale) { @Override protected Job job() { - return jobBuilder().start(step(getName()).reader(reader()).writer(writer()).build().build()).build(); + StepBuilder, Map> step = step(getName()).reader(reader()).writer(writer()); + return jobBuilder().start(build(step)).build(); } private FakerItemReader reader() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/file/FileDumpExport.java b/core/riot-core/src/main/java/com/redis/riot/core/file/FileDumpExport.java index 5052ddc1c..df6f85e9b 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/file/FileDumpExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/file/FileDumpExport.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper; import com.redis.riot.core.AbstractExport; import com.redis.riot.core.RiotExecutionException; +import com.redis.riot.core.StepBuilder; import com.redis.riot.core.file.resource.JsonResourceItemWriter; import com.redis.riot.core.file.resource.JsonResourceItemWriterBuilder; import com.redis.riot.core.file.resource.XmlResourceItemWriter; @@ -20,7 +21,6 @@ import com.redis.spring.batch.ValueType; import io.lettuce.core.AbstractRedisClient; -import io.lettuce.core.codec.StringCodec; public class FileDumpExport extends AbstractExport { @@ -99,7 +99,8 @@ public FileDumpExport(AbstractRedisClient client, String file) { @Override protected Job job() { - return jobBuilder().start(step(getName()).reader(reader(StringCodec.UTF8)).writer(writer()).build().build()).build(); + StepBuilder, KeyValue> step = step(getName()).reader(reader()).writer(writer()); + return jobBuilder().start(build(step)).build(); } private ItemWriter> writer() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/file/FileDumpImport.java b/core/riot-core/src/main/java/com/redis/riot/core/file/FileDumpImport.java index 48e164ad4..7debe606d 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/file/FileDumpImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/file/FileDumpImport.java @@ -64,8 +64,8 @@ protected Job job() { return job.build(); } - public Step step(Resource resource) { - return step(resource.getDescription()).reader(reader(resource)).writer(writer()).processor(processor()).build().build(); + private Step step(Resource resource) { + return build(step(resource.getDescription()).reader(reader(resource)).writer(writer()).processor(processor())); } private ItemProcessor, KeyValue> processor() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/file/FileImport.java b/core/riot-core/src/main/java/com/redis/riot/core/file/FileImport.java index 68363f824..8aa3d5365 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/file/FileImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/file/FileImport.java @@ -200,7 +200,7 @@ private Step step(Resource resource) { StepBuilder, Map> step = step(name).reader(reader).writer(writer()); step.processor(processor()); step.skippableExceptions(ParseException.class); - return step.build().build(); + return build(step); } @SuppressWarnings({ "unchecked", "rawtypes" }) diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/AbstractMapOperationBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/AbstractMapOperationBuilder.java index 80d347b78..230d54fe0 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/AbstractMapOperationBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/AbstractMapOperationBuilder.java @@ -94,11 +94,11 @@ protected Function, String> idFunction(String prefix, List> build() { - AbstractOperation, ?> operation = operation(); - operation.key(idFunction(keyspace, keys)); + AbstractOperation> operation = operation(); + operation.setKey(idFunction(keyspace, keys)); return operation; } - protected abstract AbstractOperation, ?> operation(); + protected abstract AbstractOperation> operation(); } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/DelBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/DelBuilder.java index f50a350a5..e03aeeea5 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/DelBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/DelBuilder.java @@ -8,7 +8,7 @@ public class DelBuilder extends AbstractMapOperationBuilder { @Override - protected AbstractOperation, ?> operation() { + protected AbstractOperation> operation() { return new Del<>(); } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireAtBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireAtBuilder.java index 380541e07..012b0d3b4 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireAtBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireAtBuilder.java @@ -15,7 +15,9 @@ public ExpireAtBuilder ttl(String field) { @Override protected ExpireAt> operation() { - return new ExpireAt>().epoch(toLong(ttl, 0)); + ExpireAt> operation = new ExpireAt<>(); + operation.setEpoch(toLong(ttl, 0)); + return operation; } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireBuilder.java index c1522f88e..5397b1d5f 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireBuilder.java @@ -25,7 +25,9 @@ public ExpireBuilder defaultTtl(Duration duration) { @Override protected Expire> operation() { - return new Expire>().ttl(ttl()); + Expire> operation = new Expire<>(); + operation.setTtl(ttl()); + return operation; } private Function, Duration> ttl() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/GeoaddBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/GeoaddBuilder.java index 193c4ed08..d71813e70 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/GeoaddBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/GeoaddBuilder.java @@ -24,7 +24,9 @@ public GeoaddBuilder longitude(String field) { @Override protected Geoadd> operation() { - return new Geoadd>().value(geoValue()); + Geoadd> operation = new Geoadd<>(); + operation.setValue(geoValue()); + return operation; } private ToGeoValueFunction> geoValue() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/HsetBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/HsetBuilder.java index ae793da6a..a36600347 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/HsetBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/HsetBuilder.java @@ -8,7 +8,9 @@ public class HsetBuilder extends AbstractFilterMapOperationBuilder @Override protected Hset> operation() { - return new Hset>().map(map()); + Hset> operation = new Hset<>(); + operation.setMap(map()); + return operation; } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/JsonSetBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/JsonSetBuilder.java index 74f02cea6..e864ff683 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/JsonSetBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/JsonSetBuilder.java @@ -23,7 +23,10 @@ public JsonSetBuilder path(String path) { @Override protected JsonSet> operation() { - return new JsonSet>().value(this::value).path(path()); + JsonSet> operation = new JsonSet<>(); + operation.setValue(this::value); + operation.setPath(path()); + return operation; } private Function, String> path() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/LpushBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/LpushBuilder.java index 7d2daea34..c65a39464 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/LpushBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/LpushBuilder.java @@ -8,7 +8,9 @@ public class LpushBuilder extends AbstractCollectionMapOperationBuilder> operation() { - return new Lpush>().value(member()); + Lpush> operation = new Lpush<>(); + operation.setValue(member()); + return operation; } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/RpushBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/RpushBuilder.java index 1591cce23..9cdfb4fb5 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/RpushBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/RpushBuilder.java @@ -8,7 +8,9 @@ public class RpushBuilder extends AbstractCollectionMapOperationBuilder> operation() { - return new Rpush>().value(member()); + Rpush> operation = new Rpush<>(); + operation.setValue(member()); + return operation; } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/SaddBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/SaddBuilder.java index 6398dce14..2486645f0 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/SaddBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/SaddBuilder.java @@ -8,7 +8,9 @@ public class SaddBuilder extends AbstractCollectionMapOperationBuilder> operation() { - return new Sadd>().value(member()); + Sadd> operation = new Sadd<>(); + operation.setValue(member()); + return operation; } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/SetBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/SetBuilder.java index 078a98a8e..fd98af8d1 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/SetBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/SetBuilder.java @@ -41,7 +41,9 @@ public SetBuilder root(String root) { @Override protected Set> operation() { - return new Set>().value(value()); + Set> operation = new Set<>(); + operation.setValue(value()); + return operation; } private Function, String> value() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/SugaddBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/SugaddBuilder.java index 8ab3d5fe4..b87220434 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/SugaddBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/SugaddBuilder.java @@ -51,7 +51,10 @@ public SugaddBuilder increment(boolean increment) { @Override protected Sugadd> operation() { - return new Sugadd>().suggestion(suggestion()).incr(increment); + Sugadd> operation = new Sugadd<>(); + operation.setSuggestion(suggestion()); + operation.setIncr(increment); + return operation; } private Function, Suggestion> suggestion() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/TsAddBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/TsAddBuilder.java index 515015a82..da5a8d869 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/TsAddBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/TsAddBuilder.java @@ -32,7 +32,10 @@ public class TsAddBuilder extends AbstractMapOperationBuilder { @Override protected TsAdd> operation() { - return new TsAdd>().sample(sample()).options(this::addOptions); + TsAdd> operation = new TsAdd<>(); + operation.setSample(sample()); + operation.setOptions(this::addOptions); + return operation; } private Function, Sample> sample() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/XaddSupplier.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/XaddSupplier.java index 7b6e714a9..4a9645192 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/XaddSupplier.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/XaddSupplier.java @@ -24,7 +24,10 @@ public XaddSupplier approximateTrimming(boolean approximateTrimming) { @Override public Xadd> operation() { - return new Xadd>().body(map()).args(args()); + Xadd> operation = new Xadd<>(); + operation.setBody(map()); + operation.setArgs(args()); + return operation; } private XAddArgs args() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/ZaddSupplier.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/ZaddSupplier.java index 3c4d13378..4e6f2bbc8 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/ZaddSupplier.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/ZaddSupplier.java @@ -16,7 +16,9 @@ public class ZaddSupplier extends AbstractCollectionMapOperationBuilder> operation() { - return new Zadd>().value(value()); + Zadd> operation = new Zadd<>(); + operation.setValue(value()); + return operation; } private ToScoredValueFunction> value() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/replicate/Replication.java b/core/riot-core/src/main/java/com/redis/riot/core/replicate/Replication.java index 79262efa7..f808720df 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/replicate/Replication.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/replicate/Replication.java @@ -5,12 +5,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.builder.JobFlowBuilder; import org.springframework.batch.core.job.builder.SimpleJobBuilder; import org.springframework.batch.core.job.flow.support.SimpleFlow; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.function.FunctionItemProcessor; @@ -29,7 +28,6 @@ import com.redis.spring.batch.RedisItemReader.Mode; import com.redis.spring.batch.RedisItemWriter; import com.redis.spring.batch.ValueType; -import com.redis.spring.batch.step.FlushingStepBuilder; import com.redis.spring.batch.util.KeyComparisonItemReader; import io.lettuce.core.AbstractRedisClient; @@ -43,6 +41,12 @@ public class Replication extends AbstractExport { public static final ValueType DEFAULT_VALUE_TYPE = ValueType.DUMP; + public static final String STEP_LIVE = "live"; + + public static final String STEP_SCAN = "scan"; + + public static final String STEP_COMPARE = "compare"; + private final Logger log = LoggerFactory.getLogger(Replication.class); private final AbstractRedisClient targetClient; @@ -105,8 +109,8 @@ protected Job job() { case COMPARE: return jobBuilder().start(compareStep()).build(); case LIVE: - SimpleFlow scanFlow = flow("scan").start(scanStep().build()).build(); - SimpleFlow liveFlow = flow("live").start(liveStep().build()).build(); + SimpleFlow scanFlow = flow("scan").start(build(scanStep())).build(); + SimpleFlow liveFlow = flow("live").start(build(liveStep())).build(); SimpleFlow replicateFlow = flow("replicate").split(asyncTaskExecutor()).add(liveFlow, scanFlow).build(); JobFlowBuilder live = jobBuilder().start(replicateFlow); if (shouldCompare()) { @@ -114,9 +118,9 @@ protected Job job() { } return live.build().build(); case LIVEONLY: - return jobBuilder().start(liveStep().build()).build(); + return jobBuilder().start(build(liveStep())).build(); case SNAPSHOT: - SimpleJobBuilder snapshot = jobBuilder().start(scanStep().build()); + SimpleJobBuilder snapshot = jobBuilder().start(build(scanStep())); if (shouldCompare()) { snapshot.next(compareStep()); } @@ -138,27 +142,26 @@ private boolean shouldCompare() { return !noVerify && !getStepOptions().isDryRun() && processorOptions.isEmpty(); } - private SimpleStepBuilder, KeyValue> scanStep() { - return step("scan", reader(ByteArrayCodec.INSTANCE)); + private StepBuilder, KeyValue> scanStep() { + return step(STEP_SCAN, reader(ByteArrayCodec.INSTANCE)); } - private SimpleStepBuilder, KeyValue> step(String name, RedisItemReader reader) { + private StepBuilder, KeyValue> step(String name, RedisItemReader reader) { reader.setName(name + "-reader"); StepBuilder, KeyValue> step = step(name).reader(reader).writer(writer()); step.processor(processor()); if (log.isDebugEnabled()) { - step.listeners(new KeyValueWriteListener(log)); + step.addWriteListener(new KeyValueWriteListener(log)); } - return step.build(); + return step; } - private FlushingStepBuilder, KeyValue> liveStep() { + private StepBuilder, KeyValue> liveStep() { checkKeyspaceNotificationsConfig(); RedisItemReader reader = reader(ByteArrayCodec.INSTANCE); reader.setMode(Mode.LIVE); - SimpleStepBuilder, KeyValue> riotStep = step("live", reader); - FlushingStepBuilder, KeyValue> step = new FlushingStepBuilder<>(riotStep); - step.interval(getRedisReaderOptions().getFlushingInterval()); + StepBuilder, KeyValue> step = step(STEP_LIVE, reader); + step.flushingInterval(getRedisReaderOptions().getFlushingInterval()); step.idleTimeout(getRedisReaderOptions().getIdleTimeout()); return step; } @@ -177,13 +180,13 @@ private void checkKeyspaceNotificationsConfig() { } } - private TaskletStep compareStep() { + private Step compareStep() { KeyComparisonItemReader reader = new KeyComparisonItemReader(reader(StringCodec.UTF8), reader(targetClient, StringCodec.UTF8, targetReaderOptions)); reader.setTtlTolerance(ttlTolerance); reader.setName("compare-reader"); KeyComparisonStatusCountItemWriter writer = new KeyComparisonStatusCountItemWriter(); - return step("compare").reader(reader).writer(writer).build().build(); + return build(step(STEP_COMPARE).reader(reader).writer(writer)); } private ItemWriter> writer() { diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java index 973542447..b80542c45 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java @@ -2,6 +2,8 @@ import com.redis.riot.core.AbstractExport; import com.redis.riot.core.AbstractJobExecutable; +import com.redis.riot.core.StepBuilder; +import com.redis.spring.batch.util.BatchUtils; import picocli.CommandLine.ArgGroup; @@ -19,4 +21,14 @@ protected AbstractJobExecutable getJobExecutable() { protected abstract AbstractExport getExportExecutable(); + @Override + protected long size(StepBuilder step) { + return BatchUtils.size(step.getReader()); + } + + @Override + protected String taskName(StepBuilder step) { + return "Exporting"; + } + } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java index c0838111d..bfec71b63 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java @@ -20,6 +20,7 @@ import com.redis.riot.cli.operation.XaddCommand; import com.redis.riot.cli.operation.ZaddCommand; import com.redis.riot.core.AbstractMapImport; +import com.redis.riot.core.StepBuilder; import com.redis.spring.batch.writer.Operation; import picocli.CommandLine.ArgGroup; @@ -61,4 +62,9 @@ protected AbstractMapImport getJobExecutable() { protected abstract AbstractMapImport getMapImportExecutable(); + @Override + protected String taskName(StepBuilder step) { + return "Importing"; + } + } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java index a0d45140b..28eebc79c 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractJobCommand.java @@ -1,69 +1,43 @@ package com.redis.riot.cli; +import com.redis.riot.cli.common.ProgressStepListener; import com.redis.riot.core.AbstractJobExecutable; import com.redis.riot.core.Executable; +import com.redis.riot.core.StepBuilder; import io.lettuce.core.AbstractRedisClient; +import me.tongfei.progressbar.ProgressBarBuilder; import picocli.CommandLine.ArgGroup; abstract class AbstractJobCommand extends AbstractCommand { - // ${DEFAULT-VALUE}),", paramLabel = "