From 7f41d99d638a5c884c3a57beac0d6082035466f6 Mon Sep 17 00:00:00 2001 From: jruaux Date: Wed, 4 Oct 2023 11:01:04 -0700 Subject: [PATCH] refactor: Moved live step options to reader --- .../com/redis/riot/db/DatabaseImport.java | 2 +- .../com/redis/riot/faker/FakerImport.java | 2 +- .../com/redis/riot/file/FileDumpExport.java | 2 +- .../com/redis/riot/file/FileDumpImport.java | 7 +- .../java/com/redis/riot/file/FileImport.java | 17 +- .../com/redis/riot/core/AbstractExport.java | 4 +- .../redis/riot/core/AbstractMapExport.java | 2 +- .../redis/riot/core/AbstractRiotRunnable.java | 5 +- .../com/redis/riot/core/GeneratorImport.java | 2 +- .../redis/riot/core/RedisReaderOptions.java | 39 ++--- .../java/com/redis/riot/core/Replication.java | 10 +- .../com/redis/riot/core/RiotSkipPolicy.java | 7 - .../java/com/redis/riot/core/StepBuilder.java | 145 +++++++++--------- .../java/com/redis/riot/core/StepOptions.java | 38 ++--- .../redis/riot/core/ThrottledItemWriter.java | 4 +- .../java/com/redis/riot/cli/LoggingMixin.java | 3 + .../java/com/redis/riot/cli/StepArgs.java | 6 - 17 files changed, 131 insertions(+), 164 deletions(-) delete mode 100644 core/riot-core/src/main/java/com/redis/riot/core/RiotSkipPolicy.java diff --git a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java index 94815a47f..cf2a2938c 100644 --- a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java +++ b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java @@ -108,7 +108,7 @@ protected Job job(RiotContext executionContext) { step.name(getName()); step.reader(reader()); step.writer(writer(executionContext)); - return jobBuilder().start(step.build()).build(); + return jobBuilder().start(step.build().build()).build(); } private ItemReader> reader() { diff --git a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java index ff6b5e6b3..6c18941b3 100644 --- a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java +++ b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java @@ -73,7 +73,7 @@ protected Job job(RiotContext executionContext) { step.name(getName()); step.reader(reader(executionContext)); step.writer(writer(executionContext)); - return jobBuilder().start(step.build()).build(); + return jobBuilder().start(step.build().build()).build(); } private FakerItemReader reader(RiotContext executionContext) { diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java index e4b54e2c0..e80460caa 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java @@ -134,7 +134,7 @@ protected Job job(RiotContext context) { step.reader(reader(context.getRedisContext())); step.writer(writer()); step.processor(processor(StringCodec.UTF8, context)); - return jobBuilder().start(step.build()).build(); + return jobBuilder().start(step.build().build()).build(); } private StructItemReader reader(RedisContext context) { diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java index 2f150f116..5cf4cd4b7 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java @@ -5,8 +5,9 @@ import java.util.List; import org.springframework.batch.core.Job; -import org.springframework.batch.core.Step; import org.springframework.batch.core.job.builder.SimpleJobBuilder; +import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; +import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemReader; import org.springframework.core.io.Resource; @@ -41,8 +42,8 @@ public void setType(FileDumpType type) { @Override protected Job job(RiotContext executionContext) { - Iterator steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(executionContext, r)) - .map(StepBuilder::build).iterator(); + Iterator steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(executionContext, r)) + .map(StepBuilder::build).map(FaultTolerantStepBuilder::build).iterator(); if (!steps.hasNext()) { throw new IllegalArgumentException("No file found"); } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java index ef6ef0449..e7dddacc8 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java @@ -143,9 +143,8 @@ public void setContinuationString(String continuationString) { } @Override - protected Job job(RiotContext executionContext) { - Iterator steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(executionContext, r)) - .iterator(); + protected Job job(RiotContext context) { + Iterator steps = FileUtils.inputResources(files, fileOptions).stream().map(r -> step(context, r)).iterator(); if (!steps.hasNext()) { throw new IllegalArgumentException("No file found"); } @@ -156,8 +155,7 @@ protected Job job(RiotContext executionContext) { return job.build(); } - @SuppressWarnings("unchecked") - private Step step(RiotContext executionContext, Resource resource) { + private Step step(RiotContext context, Resource resource) { ItemReader> reader = reader(resource); if (maxItemCount != null && reader instanceof AbstractItemCountingItemStreamItemReader) { ((AbstractItemCountingItemStreamItemReader>) reader).setMaxItemCount(maxItemCount); @@ -166,10 +164,11 @@ private Step step(RiotContext executionContext, Resource resource) { StepBuilder, Map> step = createStep(); step.name(name); step.reader(reader); - step.writer(writer(executionContext)); - step.processor(processor(executionContext)); - step.skippableExceptions(ParseException.class); - return step.build(); + step.writer(writer(context)); + step.processor(processor(context)); + step.addSkippableException(ParseException.class); + step.addNonRetriableException(ParseException.class); + return step.build().build(); } private ItemReader> reader(Resource resource) { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java index 65efa0979..8cf2d4529 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java @@ -72,11 +72,11 @@ private Function, KeyValue> function(EvaluationContext protected void configureReader(RedisItemReader reader, RedisContext context) { reader.setChunkSize(readerOptions.getChunkSize()); reader.setDatabase(context.getUri().getDatabase()); - reader.setFlushInterval(readerOptions.getFlushInterval()); - reader.setIdleTimeout(readerOptions.getIdleTimeout()); reader.setKeyProcessor(keyFilteringProcessor(reader.getCodec())); reader.setKeyPattern(readerOptions.getKeyPattern()); reader.setKeyType(readerOptions.getKeyType()); + reader.setFlushInterval(readerOptions.getFlushInterval()); + reader.setIdleTimeout(readerOptions.getIdleTimeout()); if (reader instanceof KeyValueItemReader) { KeyValueItemReader keyValueReader = (KeyValueItemReader) reader; keyValueReader.setMemoryUsageLimit(readerOptions.getMemoryUsageLimit()); diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java index 9424dda94..4d5012a2f 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java @@ -33,7 +33,7 @@ protected Job job(RiotContext context) { step.reader(reader(context.getRedisContext())); step.writer(writer()); step.processor(processor(context)); - return jobBuilder().start(step.build()).build(); + return jobBuilder().start(step.build().build()).build(); } protected StructItemReader reader(RedisContext context) { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java index d0a071072..defef7a76 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotRunnable.java @@ -10,7 +10,6 @@ import com.redis.lettucemod.RedisModulesClient; import com.redis.lettucemod.cluster.RedisModulesClusterClient; -import com.redis.spring.batch.util.BatchUtils; import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.ClientOptions; @@ -98,7 +97,7 @@ public RedisURI redisURI(RedisUriOptions options) { builder.withSsl(options.isTls()); builder.withVerifyPeer(options.getVerifyPeer()); } - if (BatchUtils.isPositive(options.getTimeout())) { + if (options.getTimeout() != null) { builder.withTimeout(options.getTimeout()); } return builder.build(); @@ -152,7 +151,7 @@ public SslOptions sslOptions(RedisSslOptions options) { private ClientResources clientResources(RedisOptions options) { DefaultClientResources.Builder builder = DefaultClientResources.builder(); - if (BatchUtils.isPositive(options.getMetricsStep())) { + if (options.getMetricsStep() != null) { builder.commandLatencyRecorder(commandLatencyRecorder()); builder.commandLatencyPublisherOptions(commandLatencyPublisherOptions(options.getMetricsStep())); } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java b/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java index 493997adf..8f446a6e0 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/GeneratorImport.java @@ -154,7 +154,7 @@ protected Job job(RiotContext context) { StepBuilder, KeyValue> step = createStep(); step.reader(reader()); step.writer(writer(context)); - return jobBuilder().start(step.build()).build(); + return jobBuilder().start(step.build().build()).build(); } private GeneratorItemReader reader() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java index 7a1e74abb..cb29290b5 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java @@ -8,6 +8,7 @@ import com.redis.spring.batch.common.DataType; import com.redis.spring.batch.reader.KeyValueItemReader; import com.redis.spring.batch.reader.KeyspaceNotificationItemReader.OrderingStrategy; +import com.redis.spring.batch.step.FlushingChunkProvider; import io.lettuce.core.ReadFrom; @@ -33,7 +34,9 @@ public class RedisReaderOptions { public static final long DEFAULT_SCAN_COUNT = 1000; - public static final Duration DEFAULT_FLUSH_INTERVAL = RedisItemReader.DEFAULT_FLUSH_INTERVAL; + public static final Duration DEFAULT_FLUSH_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSH_INTERVAL; + + public static final Duration DEFAULT_IDLE_TIMEOUT = FlushingChunkProvider.DEFAULT_IDLE_TIMEOUT; private String keyPattern; @@ -63,38 +66,38 @@ public class RedisReaderOptions { private Duration flushInterval = DEFAULT_FLUSH_INTERVAL; - private Duration idleTimeout; + private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT; - public OrderingStrategy getOrderingStrategy() { - return orderingStrategy; + public Duration getFlushInterval() { + return flushInterval; } - public void setOrderingStrategy(OrderingStrategy orderingStrategy) { - this.orderingStrategy = orderingStrategy; + public void setFlushInterval(Duration flushInterval) { + this.flushInterval = flushInterval; } - public int getNotificationQueueCapacity() { - return notificationQueueCapacity; + public Duration getIdleTimeout() { + return idleTimeout; } - public void setNotificationQueueCapacity(int notificationQueueCapacity) { - this.notificationQueueCapacity = notificationQueueCapacity; + public void setIdleTimeout(Duration idleTimeout) { + this.idleTimeout = idleTimeout; } - public Duration getFlushInterval() { - return flushInterval; + public OrderingStrategy getOrderingStrategy() { + return orderingStrategy; } - public void setFlushInterval(Duration flushingInterval) { - this.flushInterval = flushingInterval; + public void setOrderingStrategy(OrderingStrategy orderingStrategy) { + this.orderingStrategy = orderingStrategy; } - public Duration getIdleTimeout() { - return idleTimeout; + public int getNotificationQueueCapacity() { + return notificationQueueCapacity; } - public void setIdleTimeout(Duration idleTimeout) { - this.idleTimeout = idleTimeout; + public void setNotificationQueueCapacity(int notificationQueueCapacity) { + this.notificationQueueCapacity = notificationQueueCapacity; } public String getKeyPattern() { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/Replication.java b/core/riot-core/src/main/java/com/redis/riot/core/Replication.java index 7850a6baf..f52551767 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/Replication.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/Replication.java @@ -111,8 +111,8 @@ protected Job job(RiotContext context) { case COMPARE: return jobBuilder().start(compareStep(replicationContext)).build(); case LIVE: - SimpleFlow scanFlow = flow("scan").start(scanStep(replicationContext).build()).build(); - SimpleFlow liveFlow = flow("live").start(liveStep(replicationContext).build()).build(); + SimpleFlow scanFlow = flow("scan").start(scanStep(replicationContext).build().build()).build(); + SimpleFlow liveFlow = flow("live").start(liveStep(replicationContext).build().build()).build(); SimpleFlow replicateFlow = flow("replicate").split(asyncTaskExecutor()).add(liveFlow, scanFlow).build(); JobFlowBuilder live = jobBuilder().start(replicateFlow); if (shouldCompare()) { @@ -120,9 +120,9 @@ protected Job job(RiotContext context) { } return live.build().build(); case LIVEONLY: - return jobBuilder().start(liveStep(replicationContext).build()).build(); + return jobBuilder().start(liveStep(replicationContext).build().build()).build(); case SNAPSHOT: - SimpleJobBuilder snapshot = jobBuilder().start(scanStep(replicationContext).build()); + SimpleJobBuilder snapshot = jobBuilder().start(scanStep(replicationContext).build().build()); if (shouldCompare()) { snapshot.next(compareStep(replicationContext)); } @@ -206,7 +206,7 @@ private Step compareStep(ReplicationContext context) { step.addWriteListener(new KeyComparisonDiffLogger(out)); } step.addExecutionListener(new KeyComparisonSummaryLogger(writer, out)); - return step.build(); + return step.build().build(); } private KeyComparisonItemReader comparisonReader(ReplicationContext context) { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RiotSkipPolicy.java b/core/riot-core/src/main/java/com/redis/riot/core/RiotSkipPolicy.java deleted file mode 100644 index 5a0dc8e41..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/RiotSkipPolicy.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.redis.riot.core; - -public enum RiotSkipPolicy { - - ALWAYS, NEVER, LIMIT - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java index cd7b3c61c..2ebf166af 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/StepBuilder.java @@ -1,36 +1,27 @@ package com.redis.riot.core; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import org.springframework.batch.core.ItemWriteListener; -import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; -import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; -import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; -import org.springframework.batch.core.step.skip.SkipPolicy; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.SynchronizedItemStreamReader; import org.springframework.beans.factory.InitializingBean; -import org.springframework.classify.BinaryExceptionClassifier; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.step.FlushingStepBuilder; import com.redis.spring.batch.util.BatchUtils; -import io.lettuce.core.RedisException; +import io.lettuce.core.RedisCommandExecutionException; +import io.lettuce.core.RedisCommandTimeoutException; public class StepBuilder { @@ -50,28 +41,51 @@ public class StepBuilder { private StepOptions options = new StepOptions(); - private Duration flushingInterval; + private List configurationStrategies = new ArrayList<>(); - private Duration idleTimeout; + private List> skippableExceptions = defaultNonRetriableExceptions(); - private Collection> skippableExceptions = new ArrayList<>(); + private List> nonSkippableExceptions = defaultRetriableExceptions(); - private Collection> retriableExceptions = defaultRetriableExceptions(); + private List> retriableExceptions = defaultRetriableExceptions(); - private List configurationStrategies = new ArrayList<>(); + private List> nonRetriableExceptions = defaultNonRetriableExceptions(); + + @SuppressWarnings("unchecked") + public static List> defaultRetriableExceptions() { + return modifiableList(RedisCommandTimeoutException.class); + } - public static Collection> defaultRetriableExceptions() { - Collection> exceptions = new ArrayList<>(); - exceptions.add(RedisException.class); - exceptions.add(TimeoutException.class); - exceptions.add(ExecutionException.class); - return exceptions; + @SuppressWarnings("unchecked") + public static List> defaultNonRetriableExceptions() { + return modifiableList(RedisCommandExecutionException.class); + } + + @SuppressWarnings("unchecked") + private static List modifiableList(T... elements) { + return new ArrayList<>(Arrays.asList(elements)); } public StepBuilder(StepBuilderFactory factory) { this.factory = factory; } + public void addSkippableException(Class exception) { + skippableExceptions.add(exception); + } + + public void addNonSkippableException(Class exception) { + nonSkippableExceptions.add(exception); + } + + public void addRetriableException(Class exception) { + retriableExceptions.add(exception); + } + + public void addNonRetriableException(Class exception) { + nonRetriableExceptions.add(exception); + } + public String getName() { return name; } @@ -99,18 +113,6 @@ public StepBuilder name(String name) { return this; } - @SuppressWarnings("unchecked") - public StepBuilder skippableExceptions(Class... exceptions) { - this.skippableExceptions = Arrays.asList(exceptions); - return this; - } - - @SuppressWarnings("unchecked") - public StepBuilder retriableExceptions(Class... exceptions) { - this.retriableExceptions = Arrays.asList(exceptions); - return this; - } - public StepBuilder options(StepOptions options) { this.options = options; return this; @@ -133,24 +135,11 @@ public StepBuilder processor(ItemProcessor processor) { return this; } - public StepBuilder flushingInterval(Duration interval) { - this.flushingInterval = interval; - return this; - } - - public StepBuilder idleTimeout(Duration timeout) { - this.idleTimeout = timeout; - return this; - } - - public Step build() { + public FaultTolerantStepBuilder build() { configurationStrategies.forEach(s -> s.configure(this)); - initialize(reader); - initialize(processor); - initialize(writer); - SimpleStepBuilder step = factory.get(name).chunk(options.getChunkSize()); + FaultTolerantStepBuilder step = simpleStep().faultTolerant(); step.reader(reader()); - step.processor(processor); + step.processor(processor()); step.writer(writer()); if (options.getThreads() > 1) { step.taskExecutor(BatchUtils.threadPoolTaskExecutor(options.getThreads())); @@ -158,13 +147,32 @@ public Step build() { } executionListeners.forEach(step::listener); writeListeners.forEach(step::listener); - if (BatchUtils.isPositive(flushingInterval)) { - step = new FlushingStepBuilder<>(step).interval(flushingInterval).idleTimeout(idleTimeout); - } - if (options.isFaultTolerance()) { - step = retry(step.faultTolerant().skipPolicy(skipPolicy()).retryLimit(options.getRetryLimit())); + step.skipLimit(options.getSkipLimit()); + step.retryLimit(options.getRetryLimit()); + skippableExceptions.forEach(step::skip); + nonSkippableExceptions.forEach(step::noSkip); + retriableExceptions.forEach(step::retry); + nonRetriableExceptions.forEach(step::noRetry); + return step; + } + + private SimpleStepBuilder simpleStep() { + SimpleStepBuilder step = factory.get(name).chunk(options.getChunkSize()); + if (reader instanceof RedisItemReader) { + RedisItemReader redisReader = (RedisItemReader) reader; + if (redisReader.isLive()) { + FlushingStepBuilder flushingStep = new FlushingStepBuilder<>(step); + flushingStep.interval(redisReader.getFlushInterval()); + flushingStep.idleTimeout(redisReader.getIdleTimeout()); + return flushingStep; + } } - return step.build(); + return step; + } + + private ItemProcessor processor() { + initialize(processor); + return processor; } private void initialize(Object object) { @@ -177,12 +185,8 @@ private void initialize(Object object) { } } - private FaultTolerantStepBuilder retry(FaultTolerantStepBuilder step) { - retriableExceptions.forEach(step::retry); - return step; - } - private ItemReader reader() { + initialize(reader); if (reader instanceof RedisItemReader) { return reader; } @@ -195,13 +199,14 @@ private ItemReader reader() { } private ItemWriter writer() { + initialize(writer); if (options.isDryRun()) { return new NoopItemWriter<>(); } - if (BatchUtils.isPositive(options.getSleep())) { - return new ThrottledItemWriter<>(writer, options.getSleep()); + if (options.getSleep() == null || options.getSleep().isNegative() || options.getSleep().isZero()) { + return writer; } - return writer; + return new ThrottledItemWriter<>(writer, options.getSleep()); } private static class NoopItemWriter implements ItemWriter { @@ -213,18 +218,6 @@ public void write(List items) throws Exception { } - private SkipPolicy skipPolicy() { - switch (options.getSkipPolicy()) { - case ALWAYS: - return new AlwaysSkipItemSkipPolicy(); - case NEVER: - return new NeverSkipItemSkipPolicy(); - default: - return new LimitCheckingItemSkipPolicy(options.getSkipLimit(), - new BinaryExceptionClassifier(skippableExceptions)); - } - } - public StepBuilder configurationStrategies(List strategies) { this.configurationStrategies = strategies; return this; diff --git a/core/riot-core/src/main/java/com/redis/riot/core/StepOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/StepOptions.java index 3da1360ab..582e87375 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/StepOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/StepOptions.java @@ -2,32 +2,32 @@ import java.time.Duration; +import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; +import org.springframework.batch.core.step.skip.SkipPolicy; +import org.springframework.retry.policy.MaxAttemptsRetryPolicy; + public class StepOptions { - public static final RiotSkipPolicy DEFAULT_SKIP_POLICY = RiotSkipPolicy.LIMIT; + public static final SkipPolicy DEFAULT_SKIP_POLICY = new NeverSkipItemSkipPolicy(); - public static final int DEFAULT_CHUNK_SIZE = 50; + public static final int DEFAULT_SKIP_LIMIT = 0; - public static final int DEFAULT_THREADS = 1; + public static final int DEFAULT_RETRY_LIMIT = MaxAttemptsRetryPolicy.DEFAULT_MAX_ATTEMPTS; - public static final int DEFAULT_SKIP_LIMIT = 3; + public static final Duration DEFAULT_SLEEP = Duration.ZERO; - public static final int DEFAULT_RETRY_LIMIT = 1; + public static final int DEFAULT_CHUNK_SIZE = 50; - public static final int DEFAULT_PROGRESS_UPDATE_INTERVAL = 1000; + public static final int DEFAULT_THREADS = 1; private int threads = DEFAULT_THREADS; private int chunkSize = DEFAULT_CHUNK_SIZE; - private Duration sleep; + private Duration sleep = DEFAULT_SLEEP; private boolean dryRun; - private boolean faultTolerance; - - private RiotSkipPolicy skipPolicy = DEFAULT_SKIP_POLICY; - private int skipLimit = DEFAULT_SKIP_LIMIT; private int retryLimit = DEFAULT_RETRY_LIMIT; @@ -64,22 +64,6 @@ public void setSleep(Duration sleep) { this.sleep = sleep; } - public boolean isFaultTolerance() { - return faultTolerance; - } - - public void setFaultTolerance(boolean faultTolerance) { - this.faultTolerance = faultTolerance; - } - - public RiotSkipPolicy getSkipPolicy() { - return skipPolicy; - } - - public void setSkipPolicy(RiotSkipPolicy skipPolicy) { - this.skipPolicy = skipPolicy; - } - public int getSkipLimit() { return skipLimit; } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/ThrottledItemWriter.java b/core/riot-core/src/main/java/com/redis/riot/core/ThrottledItemWriter.java index 877c1eaa8..09ff3f349 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/ThrottledItemWriter.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/ThrottledItemWriter.java @@ -11,8 +11,6 @@ import org.springframework.util.Assert; import org.springframework.util.ClassUtils; -import com.redis.spring.batch.util.BatchUtils; - public class ThrottledItemWriter extends AbstractItemStreamItemWriter { private final ItemWriter delegate; @@ -23,7 +21,7 @@ public ThrottledItemWriter(ItemWriter delegate, Duration sleepDuration) { setName(ClassUtils.getShortName(getClass())); Assert.notNull(delegate, "Delegate must not be null"); Assert.notNull(sleepDuration, "Sleep duration must not be null"); - Assert.isTrue(BatchUtils.isPositive(sleepDuration), "Sleep duration must be strictly positive"); + Assert.isTrue(!sleepDuration.isNegative() && !sleepDuration.isZero(), "Sleep duration must be strictly positive"); this.delegate = delegate; this.sleep = sleepDuration.toMillis(); } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/LoggingMixin.java b/plugins/riot/src/main/java/com/redis/riot/cli/LoggingMixin.java index 40cb26cf6..ca7608a5b 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/LoggingMixin.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/LoggingMixin.java @@ -28,6 +28,8 @@ public class LoggingMixin { private static final String NETTY = "io.netty"; + private static final String CHUNK_MONITOR = "org.springframework.batch.core.step.item.ChunkMonitor"; + @Spec(Target.MIXEE) private CommandSpec mixee; @@ -139,6 +141,7 @@ private static Map defaultMinLogLevels() { logs.put(AWS, Level.ERROR); logs.put(LETTUCE, Level.INFO); logs.put(NETTY, Level.INFO); + logs.put(CHUNK_MONITOR, Level.ERROR); return logs; } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/StepArgs.java b/plugins/riot/src/main/java/com/redis/riot/cli/StepArgs.java index f3069dd47..503c61b3e 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/StepArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/StepArgs.java @@ -2,7 +2,6 @@ import java.time.Duration; -import com.redis.riot.core.RiotSkipPolicy; import com.redis.riot.core.StepOptions; import picocli.CommandLine.Option; @@ -25,9 +24,6 @@ public class StepArgs { @Option(names = "--ft", description = "Enable step fault-tolerance. Use in conjunction with retry and skip limit/policy.") boolean faultTolerance; - @Option(names = "--skip-policy", description = "Policy to determine if some processing should be skipped: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") - RiotSkipPolicy skipPolicy = StepOptions.DEFAULT_SKIP_POLICY; - @Option(names = "--skip-limit", description = "LIMIT skip policy: max number of failed items before considering the transfer has failed (default: ${DEFAULT-VALUE}).", paramLabel = "") int skipLimit = StepOptions.DEFAULT_SKIP_LIMIT; @@ -38,10 +34,8 @@ public StepOptions stepOptions() { StepOptions options = new StepOptions(); options.setChunkSize(chunkSize); options.setDryRun(dryRun); - options.setFaultTolerance(faultTolerance); options.setRetryLimit(retryLimit); options.setSkipLimit(skipLimit); - options.setSkipPolicy(skipPolicy); options.setSleep(Duration.ofMillis(sleep)); options.setThreads(threads); return options;