Skip to content

Commit

Permalink
feat: Added progress bar
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Sep 3, 2023
1 parent c363cab commit 87e4ef5
Show file tree
Hide file tree
Showing 39 changed files with 359 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;

public abstract class AbstractExport extends AbstractJobExecutable {

Expand All @@ -26,6 +27,10 @@ public void setRedisReaderOptions(RedisReaderOptions options) {
this.redisReaderOptions = options;
}

protected RedisItemReader<String, String> reader() {
return reader(StringCodec.UTF8);
}

protected <K, V> RedisItemReader<K, V> reader(RedisCodec<K, V> codec) {
return reader(client, codec, redisReaderOptions);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package com.redis.riot.core;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
Expand All @@ -35,11 +39,17 @@ public abstract class AbstractJobExecutable implements Executable {

private String name;

private List<Consumer<StepBuilder<?, ?>>> stepConsumers = new ArrayList<>();

protected AbstractJobExecutable(AbstractRedisClient client) {
setName(ClassUtils.getShortName(getClass()));
this.client = client;
}

public void addStepConsumer(Consumer<StepBuilder<?, ?>> consumer) {
stepConsumers.add(consumer);
}

public String getName() {
return name;
}
Expand All @@ -64,17 +74,13 @@ public void execute() {
throw new RiotExecutionException("Could not initialize job repository", e);
}
jobFactory = new JobBuilderFactory(jobRepository);
stepFactory = new StepBuilderFactory(jobRepository, new ResourcelessTransactionManager());
Job job = job();
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
stepFactory = stepBuilderFactory();
JobExecution execution;
try {
execution = jobLauncher.run(job, new JobParameters());
execution = jobLauncher().run(job(), new JobParameters());
} catch (JobExecutionException e) {
// Should not happen but handle anyway
throw new RiotExecutionException(MessageFormat.format("Could not run job {0}", job.getName()), e);
throw new RiotExecutionException("Could not run job", e);
}
if (execution.getStatus().isUnsuccessful()) {
List<Throwable> exceptions = execution.getAllFailureExceptions();
Expand All @@ -86,6 +92,17 @@ public void execute() {
}
}

private StepBuilderFactory stepBuilderFactory() {
return new StepBuilderFactory(jobRepository, new ResourcelessTransactionManager());
}

private JobLauncher jobLauncher() {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
launcher.setTaskExecutor(new SyncTaskExecutor());
return launcher;
}

protected JobBuilder jobBuilder() {
return jobFactory.get(name);
}
Expand All @@ -96,4 +113,9 @@ protected ReaderStepBuilder step(String name) {
return StepBuilder.factory(stepFactory).name(name).options(stepOptions);
}

protected Step build(StepBuilder<?, ?> step) {
stepConsumers.forEach(c -> c.accept(step));
return step.build().build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import org.springframework.batch.core.Job;

import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.gen.CollectionOptions;
import com.redis.spring.batch.gen.DataType;
import com.redis.spring.batch.gen.GeneratorItemReader;
Expand Down Expand Up @@ -156,7 +157,8 @@ public void setTypes(List<DataType> types) {

@Override
protected Job job() {
return jobBuilder().start(step(getName()).reader(reader()).writer(writer()).build().build()).build();
StepBuilder<KeyValue<String>, KeyValue<String>> step = step(getName()).reader(reader()).writer(writer());
return jobBuilder().start(build(step)).build();
}

private GeneratorItemReader reader() {
Expand Down
56 changes: 46 additions & 10 deletions core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.redis.riot.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
Expand All @@ -22,6 +25,7 @@
import org.springframework.classify.BinaryExceptionClassifier;

import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.util.BatchUtils;

import io.lettuce.core.RedisException;
Expand All @@ -38,10 +42,16 @@ public class StepBuilder<I, O> {

private ItemProcessor<I, O> processor;

private List<Object> listeners = new ArrayList<>();
private List<StepExecutionListener> executionListeners = new ArrayList<>();

private List<ItemWriteListener<O>> writeListeners = new ArrayList<>();

private StepOptions options = new StepOptions();

private Duration flushingInterval;

private Duration idleTimeout;

private Collection<Class<? extends Throwable>> skippableExceptions = new ArrayList<>();

private Collection<Class<? extends Throwable>> retriableExceptions = defaultRetriableExceptions();
Expand All @@ -61,6 +71,14 @@ private StepBuilder(StepBuilderFactory factory, String name, ItemReader<I> reade
this.writer = writer;
}

public String getName() {
return name;
}

public ItemReader<I> getReader() {
return reader;
}

@SuppressWarnings("unchecked")
public StepBuilder<I, O> skippableExceptions(Class<? extends Throwable>... exceptions) {
this.skippableExceptions = Arrays.asList(exceptions);
Expand All @@ -78,16 +96,29 @@ public StepBuilder<I, O> options(StepOptions options) {
return this;
}

public StepBuilder<I, O> listeners(Object... listeners) {
this.listeners = Arrays.asList(listeners);
return this;
public void addWriteListener(ItemWriteListener<O> listener) {
this.writeListeners.add(listener);
}

public void addExecutionListener(StepExecutionListener listener) {
this.executionListeners.add(listener);
}

public StepBuilder<I, O> processor(ItemProcessor<I, O> processor) {
this.processor = processor;
return this;
}

public StepBuilder<I, O> flushingInterval(Duration interval) {
this.flushingInterval = interval;
return this;
}

public StepBuilder<I, O> idleTimeout(Duration timeout) {
this.idleTimeout = timeout;
return this;
}

public SimpleStepBuilder<I, O> build() {
SimpleStepBuilder<I, O> step = factory.get(name).chunk(options.getChunkSize());
step.reader(reader());
Expand All @@ -97,17 +128,22 @@ public SimpleStepBuilder<I, O> build() {
step.taskExecutor(BatchUtils.threadPoolTaskExecutor(options.getThreads()));
step.throttleLimit(options.getThreads());
}
listeners.forEach(step::listener);
executionListeners.forEach(step::listener);
writeListeners.forEach(step::listener);
if (BatchUtils.isPositive(flushingInterval)) {
step = new FlushingStepBuilder<>(step).interval(flushingInterval).idleTimeout(idleTimeout);
}
if (options.isFaultTolerance()) {
FaultTolerantStepBuilder<I, O> ftStep = step.faultTolerant();
ftStep.skipPolicy(skipPolicy());
ftStep.retryLimit(options.getRetryLimit());
retriableExceptions.forEach(ftStep::retry);
return ftStep;
step = retry(step.faultTolerant().skipPolicy(skipPolicy()).retryLimit(options.getRetryLimit()));
}
return step;
}

private FaultTolerantStepBuilder<I, O> retry(FaultTolerantStepBuilder<I, O> step) {
retriableExceptions.forEach(step::retry);
return step;
}

private ItemReader<I> reader() {
if (reader instanceof RedisItemReader) {
return reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected Job job() {
StepBuilder<KeyValue<String>, Map<String, Object>> step = step(getName()).reader(reader(StringCodec.UTF8))
.writer(writer());
step.processor(processor());
return jobBuilder().start(step.build().build()).build();
return jobBuilder().start(build(step)).build();
}

private JdbcBatchItemWriter<Map<String, Object>> writer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.jdbc.core.ColumnMapRowMapper;

import com.redis.riot.core.AbstractMapImport;
import com.redis.riot.core.StepBuilder;

import io.lettuce.core.AbstractRedisClient;

Expand Down Expand Up @@ -103,7 +104,8 @@ public void setVerifyCursorPosition(boolean verifyCursorPosition) {

@Override
protected Job job() {
return jobBuilder().start(step(getName()).reader(reader()).writer(writer()).build().build()).build();
StepBuilder<Map<String, Object>, Map<String, Object>> step = step(getName()).reader(reader()).writer(writer());
return jobBuilder().start(build(step)).build();
}

private ItemReader<Map<String, Object>> reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.riot.core.AbstractMapImport;
import com.redis.riot.core.SpelUtils;
import com.redis.riot.core.StepBuilder;
import com.redis.spring.batch.util.IntRange;

import io.lettuce.core.AbstractRedisClient;
Expand Down Expand Up @@ -58,7 +59,8 @@ public void setLocale(Locale locale) {

@Override
protected Job job() {
return jobBuilder().start(step(getName()).reader(reader()).writer(writer()).build().build()).build();
StepBuilder<Map<String, Object>, Map<String, Object>> step = step(getName()).reader(reader()).writer(writer());
return jobBuilder().start(build(step)).build();
}

private FakerItemReader reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.redis.riot.core.AbstractExport;
import com.redis.riot.core.RiotExecutionException;
import com.redis.riot.core.StepBuilder;
import com.redis.riot.core.file.resource.JsonResourceItemWriter;
import com.redis.riot.core.file.resource.JsonResourceItemWriterBuilder;
import com.redis.riot.core.file.resource.XmlResourceItemWriter;
Expand All @@ -20,7 +21,6 @@
import com.redis.spring.batch.ValueType;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.StringCodec;

public class FileDumpExport extends AbstractExport {

Expand Down Expand Up @@ -99,7 +99,8 @@ public FileDumpExport(AbstractRedisClient client, String file) {

@Override
protected Job job() {
return jobBuilder().start(step(getName()).reader(reader(StringCodec.UTF8)).writer(writer()).build().build()).build();
StepBuilder<KeyValue<String>, KeyValue<String>> step = step(getName()).reader(reader()).writer(writer());
return jobBuilder().start(build(step)).build();
}

private ItemWriter<KeyValue<String>> writer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ protected Job job() {
return job.build();
}

public Step step(Resource resource) {
return step(resource.getDescription()).reader(reader(resource)).writer(writer()).processor(processor()).build().build();
private Step step(Resource resource) {
return build(step(resource.getDescription()).reader(reader(resource)).writer(writer()).processor(processor()));
}

private ItemProcessor<KeyValue<String>, KeyValue<String>> processor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private Step step(Resource resource) {
StepBuilder<Map<String, Object>, Map<String, Object>> step = step(name).reader(reader).writer(writer());
step.processor(processor());
step.skippableExceptions(ParseException.class);
return step.build().build();
return build(step);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ protected Function<Map<String, Object>, String> idFunction(String prefix, List<S
}

public Operation<String, String, Map<String, Object>> build() {
AbstractOperation<String, String, Map<String, Object>, ?> operation = operation();
operation.key(idFunction(keyspace, keys));
AbstractOperation<String, String, Map<String, Object>> operation = operation();
operation.setKey(idFunction(keyspace, keys));
return operation;
}

protected abstract AbstractOperation<String, String, Map<String, Object>, ?> operation();
protected abstract AbstractOperation<String, String, Map<String, Object>> operation();

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public class DelBuilder extends AbstractMapOperationBuilder<DelBuilder> {

@Override
protected AbstractOperation<String, String, Map<String, Object>, ?> operation() {
protected AbstractOperation<String, String, Map<String, Object>> operation() {
return new Del<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ public ExpireAtBuilder ttl(String field) {

@Override
protected ExpireAt<String, String, Map<String, Object>> operation() {
return new ExpireAt<String, String, Map<String, Object>>().epoch(toLong(ttl, 0));
ExpireAt<String, String, Map<String, Object>> operation = new ExpireAt<>();
operation.setEpoch(toLong(ttl, 0));
return operation;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ public ExpireBuilder defaultTtl(Duration duration) {

@Override
protected Expire<String, String, Map<String, Object>> operation() {
return new Expire<String, String, Map<String, Object>>().ttl(ttl());
Expire<String, String, Map<String, Object>> operation = new Expire<>();
operation.setTtl(ttl());
return operation;
}

private Function<Map<String, Object>, Duration> ttl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public GeoaddBuilder longitude(String field) {

@Override
protected Geoadd<String, String, Map<String, Object>> operation() {
return new Geoadd<String, String, Map<String, Object>>().value(geoValue());
Geoadd<String, String, Map<String, Object>> operation = new Geoadd<>();
operation.setValue(geoValue());
return operation;
}

private ToGeoValueFunction<String, Map<String, Object>> geoValue() {
Expand Down
Loading

0 comments on commit 87e4ef5

Please sign in to comment.