Skip to content

Commit

Permalink
fix: using default job and step names
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Oct 23, 2024
1 parent a3425be commit 97ca2d4
Show file tree
Hide file tree
Showing 22 changed files with 436 additions and 396 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;

import com.redis.spring.batch.JobUtils;
Expand Down Expand Up @@ -83,8 +84,7 @@ private JobBuilder jobBuilder() {
@Override
protected void execute() throws Exception {
if (jobName == null) {
Assert.notNull(commandSpec, "Command spec not set");
jobName = commandSpec.name();
jobName = jobName();
}
if (jobRepository == null) {
jobRepository = JobUtils.jobRepositoryFactoryBean(jobRepositoryName).getObject();
Expand All @@ -110,6 +110,13 @@ protected void execute() throws Exception {
}
}

private String jobName() {
if (commandSpec == null) {
return ClassUtils.getShortName(getClass());
}
return commandSpec.name();
}

private JobExecutionException wrapException(List<Throwable> throwables) {
if (throwables.isEmpty()) {
return new JobExecutionException("Job failed");
Expand Down Expand Up @@ -145,7 +152,7 @@ private <I, O> TaskletStep step(Step<I, O> step) {
return builder.build();
}
log.info("Adding fault-tolerance to step {}", step.getName());
FaultTolerantStepBuilder<I, O> ftStep = JobUtils.faultTolerant(builder);
FaultTolerantStepBuilder<I, O> ftStep = builder.faultTolerant();
step.getSkip().forEach(ftStep::skip);
step.getNoSkip().forEach(ftStep::noSkip);
step.getRetry().forEach(ftStep::retry);
Expand Down Expand Up @@ -181,17 +188,15 @@ private org.springframework.batch.core.step.skip.SkipPolicy skipPolicy() {

@SuppressWarnings("removal")
private <I, O> SimpleStepBuilder<I, O> simpleStep(Step<I, O> step) {
String name = jobName + "-" + step.getName();
if (name.length() >= 100) {
name = name.substring(0, 80) + "…" + name.substring(name.length() - 10);
String stepName = jobName + "-" + step.getName();
if (stepName.length() > 80) {
stepName = stepName.substring(0, 69) + "…" + stepName.substring(stepName.length() - 10);
}
if (step.getReader() instanceof ItemStreamSupport) {
ItemStreamSupport support = (ItemStreamSupport) step.getReader();
Assert.notNull(support.getName(), "No name specified for reader in step " + name);
support.setName(name + "-" + support.getName());
((ItemStreamSupport) step.getReader()).setName(stepName + "-reader");
}
log.info("Creating step {} with chunk size {}", name, stepArgs.getChunkSize());
SimpleStepBuilder<I, O> builder = new StepBuilder(name, jobRepository).<I, O>chunk(stepArgs.getChunkSize(),
log.info("Creating step {} with chunk size {}", stepName, stepArgs.getChunkSize());
SimpleStepBuilder<I, O> builder = new StepBuilder(stepName, jobRepository).<I, O>chunk(stepArgs.getChunkSize(),
transactionManager);
builder.reader(reader(step));
builder.writer(writer(step));
Expand Down
11 changes: 8 additions & 3 deletions core/riot-core/src/main/java/com/redis/riot/core/Step.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ public class Step<I, O> {

private static final long NO_MAX_ITEM_COUNT = -1;
private static final String EMPTY_STRING = "";
public static final String DEFAULT_NAME = "step";

protected final String name;
private String name = DEFAULT_NAME;
private final ItemReader<I> reader;
private final ItemWriter<O> writer;
private String taskName;
Expand All @@ -40,8 +41,7 @@ public class Step<I, O> {
private Collection<Class<? extends Throwable>> retry = new HashSet<>();
private Collection<Class<? extends Throwable>> noRetry = new HashSet<>();

public Step(String name, ItemReader<I> reader, ItemWriter<O> writer) {
this.name = name;
public Step(ItemReader<I> reader, ItemWriter<O> writer) {
this.reader = reader;
this.writer = writer;
}
Expand All @@ -50,6 +50,11 @@ public String getName() {
return name;
}

public Step<I, O> name(String name) {
this.name = name;
return this;
}

public String getTaskName() {
return taskName;
}
Expand Down
190 changes: 0 additions & 190 deletions plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public abstract class AbstractExportCommand extends AbstractJobCommand {
public static final String NOTIFY_CONFIG_VALUE = "KEA";

private static final String TASK_NAME = "Exporting";
private static final String STEP_NAME = "step";
private static final String VAR_SOURCE = "source";

@ArgGroup(exclusive = false)
Expand All @@ -51,7 +50,7 @@ protected void configure(StandardEvaluationContext context) {
protected void configureSourceRedisReader(RedisItemReader<?, ?> reader) {
configureAsyncReader(reader);
sourceRedisContext.configure(reader);
log.info("Configuring source Redis reader with {}", sourceRedisReaderArgs);
log.info("Configuring {} with {}", reader.getName(), sourceRedisReaderArgs);
sourceRedisReaderArgs.configure(reader);
}

Expand All @@ -60,20 +59,20 @@ protected void configureSourceRedisReader(RedisItemReader<?, ?> reader) {
protected <O> Step<KeyValue<String>, O> step(ItemWriter<O> writer) {
RedisItemReader<String, String> reader = RedisItemReader.struct();
configureSourceRedisReader(reader);
Step<KeyValue<String>, O> step = step(STEP_NAME, reader, writer);
Step<KeyValue<String>, O> step = step(reader, writer);
step.taskName(TASK_NAME);
return step;
}

protected <K, V, T, O> Step<KeyValue<K>, O> step(String name, RedisItemReader<K, V> reader, ItemWriter<O> writer) {
Step<KeyValue<K>, O> step = new Step<>(name, reader, writer);
protected <K, V, T, O> Step<KeyValue<K>, O> step(RedisItemReader<K, V> reader, ItemWriter<O> writer) {
Step<KeyValue<K>, O> step = new Step<>(reader, writer);
if (reader.getMode() != ReaderMode.LIVEONLY) {
log.info("Configuring step with scan size estimator");
step.maxItemCountSupplier(reader.scanSizeEstimator());
}
if (reader.getMode() != ReaderMode.SCAN) {
checkNotifyConfig(reader.getClient());
log.info("Configuring step {} with live true, flushInterval {}, idleTimeout {}", name,
log.info("Configuring export step with live true, flushInterval {}, idleTimeout {}",
reader.getFlushInterval(), reader.getIdleTimeout());
step.live(true);
step.flushInterval(reader.getFlushInterval());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
public abstract class AbstractImportCommand extends AbstractJobCommand {

private static final String TASK_NAME = "Importing";
private static final String STEP_NAME = "step";
public static final String VAR_REDIS = "redis";

@ArgGroup(exclusive = false)
Expand Down Expand Up @@ -79,7 +78,7 @@ protected Step<Map<String, Object>, Map<String, Object>> step(ItemReader<Map<Str
Assert.isTrue(hasOperations(), "No Redis command specified");
RedisItemWriter<String, String, Map<String, Object>> writer = operationWriter();
configureTargetRedisWriter(writer);
Step<Map<String, Object>, Map<String, Object>> step = new Step<>(STEP_NAME, reader, writer);
Step<Map<String, Object>, Map<String, Object>> step = new Step<>(reader, writer);
step.processor(processor());
step.taskName(TASK_NAME);
return step;
Expand Down
Loading

0 comments on commit 97ca2d4

Please sign in to comment.