Skip to content

Commit

Permalink
refactor: Moved live step options to reader
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Oct 4, 2023
1 parent 8415a93 commit 7f41d99
Show file tree
Hide file tree
Showing 17 changed files with 131 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected Job job(RiotContext executionContext) {
step.name(getName());
step.reader(reader());
step.writer(writer(executionContext));
return jobBuilder().start(step.build()).build();
return jobBuilder().start(step.build().build()).build();
}

private ItemReader<Map<String, Object>> reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Job job(RiotContext executionContext) {
step.name(getName());
step.reader(reader(executionContext));
step.writer(writer(executionContext));
return jobBuilder().start(step.build()).build();
return jobBuilder().start(step.build().build()).build();
}

private FakerItemReader reader(RiotContext executionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected Job job(RiotContext context) {
step.reader(reader(context.getRedisContext()));
step.writer(writer());
step.processor(processor(StringCodec.UTF8, context));
return jobBuilder().start(step.build()).build();
return jobBuilder().start(step.build().build()).build();
}

private StructItemReader<String, String> reader(RedisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemReader;
import org.springframework.core.io.Resource;

Expand Down Expand Up @@ -41,8 +42,8 @@ public void setType(FileDumpType type) {

@Override
protected Job job(RiotContext executionContext) {
Iterator<Step> steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(executionContext, r))
.map(StepBuilder::build).iterator();
Iterator<TaskletStep> steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(executionContext, r))
.map(StepBuilder::build).map(FaultTolerantStepBuilder::build).iterator();
if (!steps.hasNext()) {
throw new IllegalArgumentException("No file found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ public void setContinuationString(String continuationString) {
}

@Override
protected Job job(RiotContext executionContext) {
Iterator<Step> steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(executionContext, r))
.iterator();
protected Job job(RiotContext context) {
Iterator<Step> steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(context, r)).iterator();
if (!steps.hasNext()) {
throw new IllegalArgumentException("No file found");
}
Expand All @@ -156,8 +155,7 @@ protected Job job(RiotContext executionContext) {
return job.build();
}

@SuppressWarnings("unchecked")
private Step step(RiotContext executionContext, Resource resource) {
private Step step(RiotContext context, Resource resource) {
ItemReader<Map<String, Object>> reader = reader(resource);
if (maxItemCount != null && reader instanceof AbstractItemCountingItemStreamItemReader) {
((AbstractItemCountingItemStreamItemReader<Map<String, Object>>) reader).setMaxItemCount(maxItemCount);
Expand All @@ -166,10 +164,11 @@ private Step step(RiotContext executionContext, Resource resource) {
StepBuilder<Map<String, Object>, Map<String, Object>> step = createStep();
step.name(name);
step.reader(reader);
step.writer(writer(executionContext));
step.processor(processor(executionContext));
step.skippableExceptions(ParseException.class);
return step.build();
step.writer(writer(context));
step.processor(processor(context));
step.addSkippableException(ParseException.class);
step.addNonRetriableException(ParseException.class);
return step.build().build();
}

private ItemReader<Map<String, Object>> reader(Resource resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ private Function<KeyValue<String>, KeyValue<String>> function(EvaluationContext
protected <K, V> void configureReader(RedisItemReader<K, V, ?> reader, RedisContext context) {
reader.setChunkSize(readerOptions.getChunkSize());
reader.setDatabase(context.getUri().getDatabase());
reader.setFlushInterval(readerOptions.getFlushInterval());
reader.setIdleTimeout(readerOptions.getIdleTimeout());
reader.setKeyProcessor(keyFilteringProcessor(reader.getCodec()));
reader.setKeyPattern(readerOptions.getKeyPattern());
reader.setKeyType(readerOptions.getKeyType());
reader.setFlushInterval(readerOptions.getFlushInterval());
reader.setIdleTimeout(readerOptions.getIdleTimeout());
if (reader instanceof KeyValueItemReader) {
KeyValueItemReader<?, ?> keyValueReader = (KeyValueItemReader<?, ?>) reader;
keyValueReader.setMemoryUsageLimit(readerOptions.getMemoryUsageLimit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected Job job(RiotContext context) {
step.reader(reader(context.getRedisContext()));
step.writer(writer());
step.processor(processor(context));
return jobBuilder().start(step.build()).build();
return jobBuilder().start(step.build().build()).build();
}

protected StructItemReader<String, String> reader(RedisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.spring.batch.util.BatchUtils;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
Expand Down Expand Up @@ -98,7 +97,7 @@ public RedisURI redisURI(RedisUriOptions options) {
builder.withSsl(options.isTls());
builder.withVerifyPeer(options.getVerifyPeer());
}
if (BatchUtils.isPositive(options.getTimeout())) {
if (options.getTimeout() != null) {
builder.withTimeout(options.getTimeout());
}
return builder.build();
Expand Down Expand Up @@ -152,7 +151,7 @@ public SslOptions sslOptions(RedisSslOptions options) {

private ClientResources clientResources(RedisOptions options) {
DefaultClientResources.Builder builder = DefaultClientResources.builder();
if (BatchUtils.isPositive(options.getMetricsStep())) {
if (options.getMetricsStep() != null) {
builder.commandLatencyRecorder(commandLatencyRecorder());
builder.commandLatencyPublisherOptions(commandLatencyPublisherOptions(options.getMetricsStep()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected Job job(RiotContext context) {
StepBuilder<KeyValue<String>, KeyValue<String>> step = createStep();
step.reader(reader());
step.writer(writer(context));
return jobBuilder().start(step.build()).build();
return jobBuilder().start(step.build().build()).build();
}

private GeneratorItemReader reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.reader.KeyValueItemReader;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader.OrderingStrategy;
import com.redis.spring.batch.step.FlushingChunkProvider;

import io.lettuce.core.ReadFrom;

Expand All @@ -33,7 +34,9 @@ public class RedisReaderOptions {

public static final long DEFAULT_SCAN_COUNT = 1000;

public static final Duration DEFAULT_FLUSH_INTERVAL = RedisItemReader.DEFAULT_FLUSH_INTERVAL;
public static final Duration DEFAULT_FLUSH_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSH_INTERVAL;

public static final Duration DEFAULT_IDLE_TIMEOUT = FlushingChunkProvider.DEFAULT_IDLE_TIMEOUT;

private String keyPattern;

Expand Down Expand Up @@ -63,38 +66,38 @@ public class RedisReaderOptions {

private Duration flushInterval = DEFAULT_FLUSH_INTERVAL;

private Duration idleTimeout;
private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT;

public OrderingStrategy getOrderingStrategy() {
return orderingStrategy;
public Duration getFlushInterval() {
return flushInterval;
}

public void setOrderingStrategy(OrderingStrategy orderingStrategy) {
this.orderingStrategy = orderingStrategy;
public void setFlushInterval(Duration flushInterval) {
this.flushInterval = flushInterval;
}

public int getNotificationQueueCapacity() {
return notificationQueueCapacity;
public Duration getIdleTimeout() {
return idleTimeout;
}

public void setNotificationQueueCapacity(int notificationQueueCapacity) {
this.notificationQueueCapacity = notificationQueueCapacity;
public void setIdleTimeout(Duration idleTimeout) {
this.idleTimeout = idleTimeout;
}

public Duration getFlushInterval() {
return flushInterval;
public OrderingStrategy getOrderingStrategy() {
return orderingStrategy;
}

public void setFlushInterval(Duration flushingInterval) {
this.flushInterval = flushingInterval;
public void setOrderingStrategy(OrderingStrategy orderingStrategy) {
this.orderingStrategy = orderingStrategy;
}

public Duration getIdleTimeout() {
return idleTimeout;
public int getNotificationQueueCapacity() {
return notificationQueueCapacity;
}

public void setIdleTimeout(Duration idleTimeout) {
this.idleTimeout = idleTimeout;
public void setNotificationQueueCapacity(int notificationQueueCapacity) {
this.notificationQueueCapacity = notificationQueueCapacity;
}

public String getKeyPattern() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,18 @@ protected Job job(RiotContext context) {
case COMPARE:
return jobBuilder().start(compareStep(replicationContext)).build();
case LIVE:
SimpleFlow scanFlow = flow("scan").start(scanStep(replicationContext).build()).build();
SimpleFlow liveFlow = flow("live").start(liveStep(replicationContext).build()).build();
SimpleFlow scanFlow = flow("scan").start(scanStep(replicationContext).build().build()).build();
SimpleFlow liveFlow = flow("live").start(liveStep(replicationContext).build().build()).build();
SimpleFlow replicateFlow = flow("replicate").split(asyncTaskExecutor()).add(liveFlow, scanFlow).build();
JobFlowBuilder live = jobBuilder().start(replicateFlow);
if (shouldCompare()) {
live.next(compareStep(replicationContext));
}
return live.build().build();
case LIVEONLY:
return jobBuilder().start(liveStep(replicationContext).build()).build();
return jobBuilder().start(liveStep(replicationContext).build().build()).build();
case SNAPSHOT:
SimpleJobBuilder snapshot = jobBuilder().start(scanStep(replicationContext).build());
SimpleJobBuilder snapshot = jobBuilder().start(scanStep(replicationContext).build().build());
if (shouldCompare()) {
snapshot.next(compareStep(replicationContext));
}
Expand Down Expand Up @@ -206,7 +206,7 @@ private Step compareStep(ReplicationContext context) {
step.addWriteListener(new KeyComparisonDiffLogger(out));
}
step.addExecutionListener(new KeyComparisonSummaryLogger(writer, out));
return step.build();
return step.build().build();
}

private KeyComparisonItemReader comparisonReader(ReplicationContext context) {
Expand Down

This file was deleted.

Loading

0 comments on commit 7f41d99

Please sign in to comment.