diff --git a/VERSION b/VERSION index cf28a128f..66129c395 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.14.3 +2.14.4-SNAPSHOT diff --git a/build.gradle b/build.gradle index 0bfe7a7a7..6e1798494 100644 --- a/build.gradle +++ b/build.gradle @@ -74,18 +74,12 @@ subprojects { license { exclude('build/**') } - + dependencies { - testImplementation "org.junit.jupiter:junit-jupiter" - testImplementation "org.junit.jupiter:junit-jupiter-params" - testImplementation group: 'commons-io', name: 'commons-io', version: commonsIoVersion - testImplementation group: 'org.codehaus.plexus', name: 'plexus-utils', version:plexusVersion - testImplementation group: 'com.redis.testcontainers', name: 'testcontainers-redis', version: testcontainersRedisVersion - testImplementation group: 'com.redis.testcontainers', name: 'junit-jupiter', version: testcontainersVersion - - testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine" + compileOnly group: 'com.google.code.findbugs', name: 'jsr305', version: jsr305Version + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' } - + test { useJUnitPlatform() } diff --git a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseExportCommand.java b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseExportCommand.java index aa1a0835f..7a6d337af 100644 --- a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseExportCommand.java +++ b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseExportCommand.java @@ -7,7 +7,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; @@ -41,12 +42,12 @@ public DataSourceOptions getDataSourceOptions() { } @Override - protected Flow flow() throws Exception { - log.info("Creating data source: {}", dataSourceOptions); + protected Job job(JobBuilder jobBuilder) throws Exception { + log.debug("Creating data source with {}", dataSourceOptions); DataSource dataSource = dataSourceOptions.dataSource(); try (Connection connection = dataSource.getConnection()) { String dbName = connection.getMetaData().getDatabaseProductName(); - log.info("Creating {} database writer: {}", dbName, exportOptions); + log.debug("Creating writer for database {} with {}", dbName, exportOptions); JdbcBatchItemWriterBuilder> builder = new JdbcBatchItemWriterBuilder<>(); builder.itemSqlParameterSourceProvider(NullableMapSqlParameterSource::new); builder.dataSource(dataSource); @@ -55,7 +56,8 @@ protected Flow flow() throws Exception { JdbcBatchItemWriter> writer = builder.build(); writer.afterPropertiesSet(); DataStructureItemProcessor processor = DataStructureItemProcessor.of(exportOptions.getKeyRegex()); - return flow(NAME, step(NAME, String.format("Exporting to %s", dbName), processor, writer).build()); + return jobBuilder.start(step(NAME, String.format("Exporting to %s", dbName), processor, writer).build()) + .build(); } } diff --git a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImportCommand.java b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImportCommand.java index e2006be14..3f0bac1f0 100644 --- a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImportCommand.java +++ b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImportCommand.java @@ -7,7 +7,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.item.database.JdbcCursorItemReader; import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; import org.springframework.jdbc.core.ColumnMapRowMapper; @@ -37,7 +38,7 @@ public DataSourceOptions getDataSourceOptions() { } @Override - protected Flow flow() throws Exception { + protected Job job(JobBuilder jobBuilder) throws Exception { log.debug("Creating data source: {}", dataSourceOptions); DataSource dataSource = dataSourceOptions.dataSource(); try (Connection connection = dataSource.getConnection()) { @@ -62,7 +63,7 @@ protected Flow flow() throws Exception { builder.verifyCursorPosition(importOptions.isVerifyCursorPosition()); JdbcCursorItemReader> reader = builder.build(); reader.afterPropertiesSet(); - return flow(NAME, step(NAME, "Importing from " + name, reader).build()); + return jobBuilder.start(step(NAME, "Importing from " + name, reader).build()).build(); } } diff --git a/connectors/riot-db/src/test/java/com/redis/riot/db/AbstractDatabaseTests.java b/connectors/riot-db/src/test/java/com/redis/riot/db/AbstractDatabaseTests.java index b1c0083e1..04dd809d1 100644 --- a/connectors/riot-db/src/test/java/com/redis/riot/db/AbstractDatabaseTests.java +++ b/connectors/riot-db/src/test/java/com/redis/riot/db/AbstractDatabaseTests.java @@ -1,11 +1,13 @@ package com.redis.riot.db; -import com.redis.riot.AbstractRiotIntegrationTests; +import javax.sql.DataSource; + import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.testcontainers.containers.JdbcDatabaseContainer; -import picocli.CommandLine; -import javax.sql.DataSource; +import com.redis.riot.AbstractRiotIntegrationTests; + +import picocli.CommandLine; public abstract class AbstractDatabaseTests extends AbstractRiotIntegrationTests { diff --git a/connectors/riot-db/src/test/java/com/redis/riot/db/PostgreSQLTests.java b/connectors/riot-db/src/test/java/com/redis/riot/db/PostgreSQLTests.java index db7edf442..d19b7f134 100644 --- a/connectors/riot-db/src/test/java/com/redis/riot/db/PostgreSQLTests.java +++ b/connectors/riot-db/src/test/java/com/redis/riot/db/PostgreSQLTests.java @@ -28,8 +28,8 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.redis.lettucemod.api.sync.RedisModulesCommands; import com.redis.spring.batch.support.generator.Generator.Type; -import com.redis.testcontainers.junit.jupiter.RedisTestContext; -import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource; +import com.redis.testcontainers.junit.RedisTestContext; +import com.redis.testcontainers.junit.RedisTestContextsSource; @Testcontainers @SuppressWarnings({ "rawtypes", "unchecked" }) diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileImportCommand.java b/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileImportCommand.java index 1f9e10e69..e1e0c3966 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileImportCommand.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/DumpFileImportCommand.java @@ -3,13 +3,16 @@ import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.builder.SimpleJobBuilder; +import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.json.JsonItemReader; import org.springframework.batch.item.support.AbstractItemStreamItemReader; @@ -63,23 +66,28 @@ public RedisWriterOptions getWriterOptions() { } @Override - protected Flow flow() throws Exception { + protected Job job(JobBuilder jobBuilder) throws Exception { Assert.isTrue(!ObjectUtils.isEmpty(files), "No file specified"); List expandedFiles = FileUtils.expand(files); if (ObjectUtils.isEmpty(expandedFiles)) { throw new FileNotFoundException("File not found: " + String.join(", ", files)); } - List steps = new ArrayList<>(); - for (String file : expandedFiles) { - DumpFileType fileType = fileType(file); - Resource resource = options.inputResource(file); - AbstractItemStreamItemReader> reader = reader(fileType, resource); - reader.setName(file + "-" + NAME + "-reader"); - RiotStepBuilder, DataStructure> step = riotStep(file + "-" + NAME, - "Importing " + file); - steps.add(step.reader(reader).processor(this::processDataStructure).writer(writer()).build().build()); + Iterator fileIterator = expandedFiles.iterator(); + SimpleJobBuilder simpleJobBuilder = jobBuilder.start(fileImportStep(fileIterator.next())); + while (fileIterator.hasNext()) { + simpleJobBuilder.next(fileImportStep(fileIterator.next())); } - return flow(NAME, steps.toArray(new Step[0])); + return simpleJobBuilder.build(); + } + + private TaskletStep fileImportStep(String file) throws Exception { + DumpFileType fileType = fileType(file); + Resource resource = options.inputResource(file); + AbstractItemStreamItemReader> reader = reader(fileType, resource); + reader.setName(file + "-" + NAME + "-reader"); + RiotStepBuilder, DataStructure> step = riotStep(file + "-" + NAME, + "Importing " + file); + return step.reader(reader).processor(this::processDataStructure).writer(writer()).build().build(); } @SuppressWarnings("unchecked") diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportCommand.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportCommand.java index 34984b5c8..9dcae9cfe 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportCommand.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileExportCommand.java @@ -2,7 +2,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.batch.item.json.JsonObjectMarshaller; @@ -42,10 +43,10 @@ public FileExportOptions getOptions() { } @Override - protected Flow flow() throws Exception { + protected Job job(JobBuilder jobBuilder) throws Exception { WritableResource resource = options.outputResource(file); String taskName = String.format("Exporting %s", resource.getFilename()); - return flow(NAME, step(NAME, taskName, writer(resource)).build()); + return jobBuilder.start(step(NAME, taskName, writer(resource)).build()).build(); } private ItemWriter> writer(WritableResource resource) { diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportCommand.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportCommand.java index 712e0bd4d..f3c167550 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportCommand.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImportCommand.java @@ -4,14 +4,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.job.flow.Flow; -import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.builder.SimpleJobBuilder; +import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.FlatFileParseException; import org.springframework.batch.item.file.LineCallbackHandler; @@ -78,27 +80,30 @@ public FileImportOptions getOptions() { } @Override - protected Flow flow() throws Exception { + protected Job job(JobBuilder jobBuilder) throws Exception { Assert.isTrue(!ObjectUtils.isEmpty(files), "No file specified"); List expandedFiles = FileUtils.expand(files); if (ObjectUtils.isEmpty(expandedFiles)) { throw new FileNotFoundException("File not found: " + String.join(", ", files)); } - List steps = new ArrayList<>(); - for (String file : expandedFiles) { - FileType fileType = type(file); - if (fileType == null) { - throw new IllegalArgumentException("Could not determine type of file " + file); - } - Resource resource = options.inputResource(file); - AbstractItemStreamItemReader> reader = reader(file, fileType, resource); - reader.setName(file + "-" + NAME + "-reader"); - FaultTolerantStepBuilder, Map> step = step(file + "-" + NAME, - "Importing " + resource.getFilename(), reader); - step.skip(FlatFileParseException.class); - steps.add(step.build()); + Iterator fileIterator = expandedFiles.iterator(); + SimpleJobBuilder simpleJobBuilder = jobBuilder.start(fileImportStep(fileIterator.next())); + while (fileIterator.hasNext()) { + simpleJobBuilder.next(fileImportStep(fileIterator.next())); + } + return simpleJobBuilder.build(); + } + + private TaskletStep fileImportStep(String file) throws Exception { + FileType fileType = type(file); + if (fileType == null) { + throw new IllegalArgumentException("Could not determine type of file " + file); } - return flow(NAME, steps.toArray(new Step[0])); + Resource resource = options.inputResource(file); + AbstractItemStreamItemReader> reader = reader(file, fileType, resource); + reader.setName(file + "-" + NAME + "-reader"); + return step(file + "-" + NAME, "Importing " + resource.getFilename(), reader).skip(FlatFileParseException.class) + .build(); } private FileType type(String file) { diff --git a/connectors/riot-file/src/test/java/com/redis/riot/file/test/RiotFileModulesTests.java b/connectors/riot-file/src/test/java/com/redis/riot/file/test/RiotFileModulesTests.java index 4526753a7..bdae4f912 100644 --- a/connectors/riot-file/src/test/java/com/redis/riot/file/test/RiotFileModulesTests.java +++ b/connectors/riot-file/src/test/java/com/redis/riot/file/test/RiotFileModulesTests.java @@ -17,8 +17,8 @@ import com.redis.riot.file.RiotFile; import com.redis.testcontainers.RedisModulesContainer; import com.redis.testcontainers.RedisServer; -import com.redis.testcontainers.junit.jupiter.RedisTestContext; -import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource; +import com.redis.testcontainers.junit.RedisTestContext; +import com.redis.testcontainers.junit.RedisTestContextsSource; @SuppressWarnings("unchecked") @Testcontainers diff --git a/connectors/riot-file/src/test/java/com/redis/riot/file/test/RiotFileRedisTests.java b/connectors/riot-file/src/test/java/com/redis/riot/file/test/RiotFileRedisTests.java index b5040459b..0b76ff419 100644 --- a/connectors/riot-file/src/test/java/com/redis/riot/file/test/RiotFileRedisTests.java +++ b/connectors/riot-file/src/test/java/com/redis/riot/file/test/RiotFileRedisTests.java @@ -36,8 +36,8 @@ import com.redis.riot.file.resource.XmlObjectReader; import com.redis.riot.redis.HsetCommand; import com.redis.spring.batch.support.DataStructure; -import com.redis.testcontainers.junit.jupiter.RedisTestContext; -import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource; +import com.redis.testcontainers.junit.RedisTestContext; +import com.redis.testcontainers.junit.RedisTestContextsSource; import io.lettuce.core.GeoArgs; import io.lettuce.core.api.sync.RedisGeoCommands; diff --git a/connectors/riot-gen/src/main/java/com/redis/riot/gen/GeneratorImportCommand.java b/connectors/riot-gen/src/main/java/com/redis/riot/gen/GeneratorImportCommand.java index 2890ad37d..971e82c5c 100644 --- a/connectors/riot-gen/src/main/java/com/redis/riot/gen/GeneratorImportCommand.java +++ b/connectors/riot-gen/src/main/java/com/redis/riot/gen/GeneratorImportCommand.java @@ -5,7 +5,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemReader; import com.redis.lettucemod.RedisModulesUtils; @@ -30,8 +32,9 @@ public class GeneratorImportCommand extends AbstractImportCommand { private GenerateOptions options = new GenerateOptions(); @Override - protected Flow flow() throws Exception { - return flow(NAME, step(NAME, "Generating", reader()).build()); + protected Job job(JobBuilder jobBuilder) throws Exception { + TaskletStep step = step(NAME, "Generating", reader()).build(); + return jobBuilder.start(step).build(); } private ItemReader> reader() { diff --git a/connectors/riot-gen/src/test/java/com/redis/riot/gen/TestGen.java b/connectors/riot-gen/src/test/java/com/redis/riot/gen/TestGen.java index 237a30e20..890df3bab 100644 --- a/connectors/riot-gen/src/test/java/com/redis/riot/gen/TestGen.java +++ b/connectors/riot-gen/src/test/java/com/redis/riot/gen/TestGen.java @@ -15,12 +15,12 @@ import com.redis.lettucemod.search.Document; import com.redis.lettucemod.search.Field; import com.redis.lettucemod.search.Field.TextField.PhoneticMatcher; -import com.redis.lettucemod.search.SearchResults; import com.redis.riot.AbstractRiotIntegrationTests; +import com.redis.lettucemod.search.SearchResults; import com.redis.testcontainers.RedisModulesContainer; import com.redis.testcontainers.RedisServer; -import com.redis.testcontainers.junit.jupiter.RedisTestContext; -import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource; +import com.redis.testcontainers.junit.RedisTestContext; +import com.redis.testcontainers.junit.RedisTestContextsSource; import io.lettuce.core.Range; import io.lettuce.core.StreamMessage; diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractRedisCommandCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractRedisCommandCommand.java index 8444f47da..f4b3f8a5d 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractRedisCommandCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractRedisCommandCommand.java @@ -1,6 +1,8 @@ package com.redis.riot.redis; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.repeat.RepeatStatus; import com.redis.lettucemod.api.StatefulRedisModulesConnection; @@ -13,15 +15,15 @@ public abstract class AbstractRedisCommandCommand extends AbstractRiotCommand { @Override - protected Flow flow() throws Exception { + protected Job job(JobBuilder jobBuilder) throws Exception { String name = name(); - return flow(name, step(name).tasklet((contribution, chunkContext) -> { + Step step = step(name).tasklet((contribution, chunkContext) -> { try (StatefulRedisModulesConnection connection = getRedisOptions().connect()) { - RedisModulesCommands commands = connection.sync(); - execute(commands); + execute(connection.sync()); return RepeatStatus.FINISHED; } - }).build()); + }).build(); + return jobBuilder.start(step).build(); } protected abstract void execute(RedisModulesCommands commands) throws InterruptedException; diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractTargetCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractTargetCommand.java index 2e2b0f529..404b22e7b 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractTargetCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/AbstractTargetCommand.java @@ -3,14 +3,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; -import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.job.flow.Flow; -import org.springframework.batch.core.job.flow.support.SimpleFlow; +import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.batch.core.listener.StepExecutionListenerSupport; import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.core.step.tasklet.TaskletStep; import com.redis.riot.AbstractTransferCommand; import com.redis.riot.RedisOptions; @@ -56,14 +55,20 @@ public CompareOptions getCompareOptions() { @Override protected JobBuilder configureJob(JobBuilder job) { - return super.configureJob(job).listener(new CleanupJobExecutionListener(targetRedisOptions)); + return super.configureJob(job).listener(new JobExecutionListenerSupport() { + + @Override + public void afterJob(JobExecution jobExecution) { + targetRedisOptions.shutdown(); + } + }); } protected void initialMax(RiotStepBuilder step) { step.initialMax(readerOptions.initialMaxSupplier(estimator())); } - protected Flow verificationFlow() throws Exception { + protected Step verificationStep() throws Exception { RedisItemReader> sourceReader = readerOptions .configureScanReader(configureJobRepository(reader(getRedisOptions()).dataStructure())).build(); log.debug("Creating key comparator with TTL tolerance of {} seconds", compareOptions.getTtlTolerance()); @@ -83,16 +88,19 @@ protected Flow verificationFlow() throws Exception { step.listener(new StepExecutionListenerSupport() { @Override public ExitStatus afterStep(StepExecution stepExecution) { + if (stepExecution.getStatus().isUnsuccessful()) { + return null; + } if (writer.getResults().isOK()) { log.info("Verification completed - all OK"); - return super.afterStep(stepExecution); + return ExitStatus.COMPLETED; } try { Thread.sleep(transferOptions.getProgressUpdateIntervalMillis()); } catch (InterruptedException e) { log.debug("Verification interrupted"); Thread.currentThread().interrupt(); - return null; + return ExitStatus.STOPPED; } KeyComparisonResults results = writer.getResults(); log.warn("Verification failed: OK={} Missing={} Values={} TTLs={} Types={}", results.getOK(), @@ -100,8 +108,7 @@ public ExitStatus afterStep(StepExecution stepExecution) { return new ExitStatus(ExitStatus.FAILED.getExitCode(), "Verification failed"); } }); - TaskletStep verificationStep = step.build(); - return new FlowBuilder(VERIFICATION_NAME).start(verificationStep).build(); + return step.build(); } private DataStructureValueReaderBuilder dataStructureValueReader(RedisOptions redisOptions) { diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/CompareCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/CompareCommand.java index 5b022ebf5..08cb4c3f6 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/CompareCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/CompareCommand.java @@ -1,6 +1,7 @@ package com.redis.riot.redis; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; import picocli.CommandLine.Command; @@ -8,8 +9,8 @@ public class CompareCommand extends AbstractTargetCommand { @Override - protected Flow flow() throws Exception { - return verificationFlow(); + protected Job job(JobBuilder jobBuilder) throws Exception { + return jobBuilder.start(verificationStep()).build(); } } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/InfoCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/InfoCommand.java index 31b1b3168..b914b4e7d 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/InfoCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/InfoCommand.java @@ -14,7 +14,9 @@ public class InfoCommand extends AbstractRedisCommandCommand { @Override protected void execute(RedisModulesCommands commands) { - log.info(commands.info()); + if (log.isInfoEnabled()) { + log.info(commands.info()); + } } @Override diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateCommand.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateCommand.java index 887cb350e..d6f3345a7 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateCommand.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/ReplicateCommand.java @@ -1,13 +1,18 @@ package com.redis.riot.redis; +import java.util.Optional; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; import org.springframework.batch.core.job.builder.FlowBuilder; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.builder.JobFlowBuilder; +import org.springframework.batch.core.job.builder.SimpleJobBuilder; import org.springframework.batch.core.job.flow.support.SimpleFlow; import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.repeat.RepeatStatus; import org.springframework.core.task.SimpleAsyncTaskExecutor; import com.redis.riot.FlushingTransferOptions; @@ -28,14 +33,6 @@ public class ReplicateCommand extends AbstractTargetCommand { private static final Logger log = LoggerFactory.getLogger(ReplicateCommand.class); - private static final String SKIPPED_VERIFICATION_NAME = "skipped-verification-notification"; - - private static final String LIVE_REPLICATION_NAME = "live-replication"; - - private static final String SCAN_REPLICATION_NAME = "scan-replication"; - - private static final String REPLICATION_NAME = "replication"; - @CommandLine.Mixin private FlushingTransferOptions flushingTransferOptions = new FlushingTransferOptions(); @CommandLine.Mixin @@ -62,59 +59,61 @@ public RedisWriterOptions getWriterOptions() { } @Override - protected Flow flow() throws Exception { - if (replicationOptions.isVerify()) { - return new FlowBuilder(REPLICATION_NAME).start(replicationFlow()).next(verificationFlow()) - .build(); - } - return replicationFlow(); - } - - @Override - protected Flow verificationFlow() throws Exception { - if (processorOptions.getKeyProcessor() == null) { - return super.verificationFlow(); - } - // Verification cannot be done if a processor is set - return flow(SKIPPED_VERIFICATION_NAME, step(SKIPPED_VERIFICATION_NAME).tasklet((contribution, chunkContext) -> { - log.info("Key processor enabled, skipping verification"); - return RepeatStatus.FINISHED; - }).build()); - } - - private Flow replicationFlow() throws Exception { + protected Job job(JobBuilder jobBuilder) throws Exception { + Optional verificationStep = optionalVerificationStep(); switch (replicationOptions.getMode()) { case LIVE: - SimpleFlow notificationFlow = new FlowBuilder(LIVE_REPLICATION_NAME).start(liveStep()).build(); - SimpleFlow scanFlow = new FlowBuilder(SCAN_REPLICATION_NAME).start(scanStep()).build(); - return new FlowBuilder(REPLICATION_NAME).split(new SimpleAsyncTaskExecutor()) - .add(notificationFlow, scanFlow).build(); + SimpleFlow notificationFlow = new FlowBuilder("live-replication-flow") + .start(liveReplicationStep()).build(); + SimpleFlow scanFlow = new FlowBuilder("scan-replication-flow").start(scanStep()).build(); + SimpleFlow replicationFlow = new FlowBuilder("replication-flow") + .split(new SimpleAsyncTaskExecutor()).add(notificationFlow, scanFlow).build(); + JobFlowBuilder jobFlowBuilder = jobBuilder.start(replicationFlow); + verificationStep.ifPresent(jobFlowBuilder::next); + return jobFlowBuilder.build().build(); case LIVEONLY: - return new FlowBuilder(LIVE_REPLICATION_NAME).start(liveStep()).build(); + SimpleJobBuilder liveReplicationJob = jobBuilder.start(liveReplicationStep()); + verificationStep.ifPresent(liveReplicationJob::next); + return liveReplicationJob.build(); + case SNAPSHOT: + SimpleJobBuilder scanReplicationJob = jobBuilder.start(scanStep()); + verificationStep.ifPresent(scanReplicationJob::next); + return scanReplicationJob.build(); default: - return new FlowBuilder(SCAN_REPLICATION_NAME).start(scanStep()).build(); + throw new IllegalArgumentException("Unknown replication mode: " + replicationOptions.getMode()); + } + } + + protected Optional optionalVerificationStep() throws Exception { + if (replicationOptions.isVerify()) { + if (processorOptions.getKeyProcessor() == null) { + return Optional.of(verificationStep()); + } + // Verification cannot be done if a processor is set + log.warn("Key processor enabled, verification will be skipped"); } + return Optional.empty(); } @SuppressWarnings({ "rawtypes", "unchecked" }) private TaskletStep scanStep() throws Exception { RedisItemReader reader = reader().build(); reader.setName("redis-scan-reader"); - RiotStepBuilder scanStep = riotStep(SCAN_REPLICATION_NAME, "Scanning"); + RiotStepBuilder scanStep = riotStep("scan-replication-step", "Scanning"); initialMax(scanStep); return configure(scanStep.reader(reader)).build().build(); } @SuppressWarnings({ "rawtypes", "unchecked" }) - private TaskletStep liveStep() throws Exception { + private TaskletStep liveReplicationStep() throws Exception { RedisItemReader reader = reader().live().keyPatterns(readerOptions.getScanMatch()) .notificationQueueCapacity(replicationOptions.getNotificationQueueCapacity()) .database(getRedisOptions().uris().get(0).getDatabase()) .flushingInterval(flushingTransferOptions.getFlushIntervalDuration()) .idleTimeout(flushingTransferOptions.getIdleTimeoutDuration()).build(); - reader.setName("redis--live-reader"); + reader.setName("redis-live-reader"); return configure( - riotStep(LIVE_REPLICATION_NAME, "Listening").reader(reader).flushingOptions(flushingTransferOptions)) + riotStep("live-replication-step", "Listening").reader(reader).flushingOptions(flushingTransferOptions)) .build().build(); } @@ -125,7 +124,7 @@ private RiotStepBuilder configure(RiotStepBuilder step) { @SuppressWarnings("rawtypes") private ItemWriter writer() { - log.info("Configuring writer with {}", targetRedisOptions); + log.debug("Configuring writer with {}", targetRedisOptions); OperationItemWriterBuilder writer = writer(targetRedisOptions); return writerOptions.configureWriter(redisWriter(writer)).build(); } diff --git a/connectors/riot-redis/src/test/java/com/redis/riot/redis/TestReplicate.java b/connectors/riot-redis/src/test/java/com/redis/riot/redis/TestReplicate.java index 5f6cd0996..76213259d 100644 --- a/connectors/riot-redis/src/test/java/com/redis/riot/redis/TestReplicate.java +++ b/connectors/riot-redis/src/test/java/com/redis/riot/redis/TestReplicate.java @@ -16,8 +16,8 @@ import com.redis.spring.batch.support.compare.KeyComparisonResults; import com.redis.testcontainers.RedisContainer; import com.redis.testcontainers.RedisServer; -import com.redis.testcontainers.junit.jupiter.RedisTestContext; -import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource; +import com.redis.testcontainers.junit.RedisTestContext; +import com.redis.testcontainers.junit.RedisTestContextsSource; import io.lettuce.core.RedisURI; import picocli.CommandLine; @@ -81,6 +81,12 @@ void replicateLive(RedisTestContext container) throws Exception { runLiveReplication("replicate-live", container); } + @ParameterizedTest + @RedisTestContextsSource + void replicateLiveThreads(RedisTestContext container) throws Exception { + runLiveReplication("replicate-live-threads", container); + } + @ParameterizedTest @RedisTestContextsSource void replicateDSLive(RedisTestContext container) throws Exception { diff --git a/connectors/riot-redis/src/test/resources/replicate-live-threads b/connectors/riot-redis/src/test/resources/replicate-live-threads new file mode 100644 index 000000000..462602fa8 --- /dev/null +++ b/connectors/riot-redis/src/test/resources/replicate-live-threads @@ -0,0 +1 @@ +riot-redis -h source -p 6379 replicate -h target -p 6380 --mode live --threads 3 \ No newline at end of file diff --git a/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamExportCommand.java b/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamExportCommand.java index 9423ac080..88a0aaea1 100644 --- a/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamExportCommand.java +++ b/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamExportCommand.java @@ -1,14 +1,16 @@ package com.redis.riot.stream; -import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.builder.SimpleJobBuilder; +import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemProcessor; import org.springframework.core.convert.converter.Converter; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -39,7 +41,7 @@ public class StreamExportCommand extends AbstractTransferCommand { @CommandLine.Mixin private FlushingTransferOptions flushingTransferOptions = new FlushingTransferOptions(); @Parameters(arity = "0..*", description = "One ore more streams to read from", paramLabel = "STREAM") - private String[] streams; + private List streams; @CommandLine.Mixin private KafkaOptions options = new KafkaOptions(); @Option(names = "--offset", description = "XREAD offset (default: ${DEFAULT-VALUE})", paramLabel = "") @@ -55,11 +57,11 @@ public void setFlushingTransferOptions(FlushingTransferOptions flushingTransferO this.flushingTransferOptions = flushingTransferOptions; } - public String[] getStreams() { + public List getStreams() { return streams; } - public void setStreams(String[] streams) { + public void setStreams(List streams) { this.streams = streams; } @@ -88,17 +90,22 @@ public void setTopic(String topic) { } @Override - protected Flow flow() throws Exception { + protected Job job(JobBuilder jobBuilder) throws Exception { Assert.isTrue(!ObjectUtils.isEmpty(streams), "No stream specified"); - List steps = new ArrayList<>(); - for (String stream : streams) { - StreamItemReader reader = reader(getRedisOptions()).stream(stream).build(); - RiotStepBuilder, ProducerRecord> step = riotStep( - stream + "-" + NAME, "Exporting from " + stream); - steps.add(step.reader(reader).processor(processor()).writer(writer()) - .flushingOptions(flushingTransferOptions).build().build()); + Iterator streamIterator = streams.iterator(); + SimpleJobBuilder simpleJobBuilder = jobBuilder.start(streamExportStep(streamIterator.next())); + while (streamIterator.hasNext()) { + simpleJobBuilder.next(streamExportStep(streamIterator.next())); } - return flow(NAME, steps.toArray(new Step[0])); + return simpleJobBuilder.build(); + } + + private TaskletStep streamExportStep(String stream) throws Exception { + StreamItemReader reader = reader(getRedisOptions()).stream(stream).build(); + RiotStepBuilder, ProducerRecord> step = riotStep( + stream + "-" + NAME, "Exporting from " + stream); + return step.reader(reader).processor(processor()).writer(writer()).flushingOptions(flushingTransferOptions) + .build().build(); } private KafkaItemWriter writer() { diff --git a/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamImportCommand.java b/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamImportCommand.java index fc2e29d4a..0ada04252 100644 --- a/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamImportCommand.java +++ b/connectors/riot-stream/src/main/java/com/redis/riot/stream/StreamImportCommand.java @@ -1,6 +1,6 @@ package com.redis.riot.stream; -import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -8,8 +8,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.job.flow.Flow; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.builder.SimpleJobBuilder; +import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.core.convert.converter.Converter; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -40,7 +42,7 @@ public class StreamImportCommand extends AbstractTransferCommand { @CommandLine.Mixin private FlushingTransferOptions flushingTransferOptions = new FlushingTransferOptions(); @Parameters(arity = "0..*", description = "One ore more topics to read from", paramLabel = "TOPIC") - private String[] topics; + private List topics; @CommandLine.Mixin private KafkaOptions options = new KafkaOptions(); @Option(names = "--key", description = "Target stream key (default: same as topic)", paramLabel = "") @@ -62,11 +64,11 @@ public void setFlushingTransferOptions(FlushingTransferOptions flushingTransferO this.flushingTransferOptions = flushingTransferOptions; } - public String[] getTopics() { + public List getTopics() { return topics; } - public void setTopics(String[] topics) { + public void setTopics(List topics) { this.topics = topics; } @@ -119,26 +121,29 @@ public void setWriterOptions(RedisWriterOptions writerOptions) { } @Override - protected Flow flow() throws Exception { + protected Job job(JobBuilder jobBuilder) throws Exception { Assert.isTrue(!ObjectUtils.isEmpty(topics), "No topic specified"); - List steps = new ArrayList<>(); - Properties consumerProperties = options.consumerProperties(); - log.debug("Using Kafka consumer properties: {}", consumerProperties); - for (String topic : topics) { - log.debug("Creating Kafka reader for topic {}", topic); - KafkaItemReader reader = new KafkaItemReaderBuilder().partitions(0) - .consumerProperties(consumerProperties).partitions(0).name(topic).saveState(false).topic(topic) - .build(); - RiotStepBuilder, ConsumerRecord> step = riotStep( - topic + "-" + NAME, "Importing from " + topic); - Xadd> xadd = Xadd - .>key(keyConverter()).body(bodyConverter()) - .args(xAddArgs()).build(); - RedisItemWriter> writer = writerOptions - .configureWriter(writer(getRedisOptions()).operation(xadd)).build(); - steps.add(step.reader(reader).writer(writer).flushingOptions(flushingTransferOptions).build().build()); + Iterator topicIterator = topics.iterator(); + SimpleJobBuilder simpleJobBuilder = jobBuilder.start(topicImportStep(topicIterator.next())); + while (topicIterator.hasNext()) { + simpleJobBuilder.next(topicImportStep(topicIterator.next())); } - return flow(NAME, steps.toArray(new Step[0])); + return simpleJobBuilder.build(); + } + + private TaskletStep topicImportStep(String topic) throws Exception { + Properties consumerProperties = options.consumerProperties(); + log.debug("Creating Kafka reader for topic {} with {}", topic, consumerProperties); + KafkaItemReader reader = new KafkaItemReaderBuilder().partitions(0) + .consumerProperties(consumerProperties).partitions(0).name(topic).saveState(false).topic(topic).build(); + RiotStepBuilder, ConsumerRecord> step = riotStep( + topic + "-" + NAME, "Importing from " + topic); + Xadd> xadd = Xadd + .>key(keyConverter()).body(bodyConverter()) + .args(xAddArgs()).build(); + RedisItemWriter> writer = writerOptions + .configureWriter(writer(getRedisOptions()).operation(xadd)).build(); + return step.reader(reader).writer(writer).flushingOptions(flushingTransferOptions).build().build(); } private Converter, Map> bodyConverter() { diff --git a/connectors/riot-stream/src/main/java/com/redis/riot/stream/kafka/KafkaItemReader.java b/connectors/riot-stream/src/main/java/com/redis/riot/stream/kafka/KafkaItemReader.java index ac29ff1e9..97747fa30 100644 --- a/connectors/riot-stream/src/main/java/com/redis/riot/stream/kafka/KafkaItemReader.java +++ b/connectors/riot-stream/src/main/java/com/redis/riot/stream/kafka/KafkaItemReader.java @@ -64,7 +64,6 @@ public class KafkaItemReader extends AbstractItemStreamItemReader kafkaConsumer; private Iterator> consumerRecords; private boolean saveState = true; - private boolean open; /** * Create a new {@link KafkaItemReader}. @@ -139,7 +138,6 @@ public void open(ExecutionContext executionContext) throws ItemStreamException { this.kafkaConsumer.assign(this.topicPartitions); this.partitionOffsets.forEach(this.kafkaConsumer::seek); super.open(executionContext); - open = true; } @Override @@ -177,11 +175,6 @@ public void close() { if (this.kafkaConsumer != null) { this.kafkaConsumer.close(); } - open = false; } - @Override - public boolean isOpen() { - return open; - } } diff --git a/connectors/riot-stream/src/test/java/com/redis/riot/stream/TestKafka.java b/connectors/riot-stream/src/test/java/com/redis/riot/stream/TestKafka.java index 514759da3..445a2176e 100644 --- a/connectors/riot-stream/src/test/java/com/redis/riot/stream/TestKafka.java +++ b/connectors/riot-stream/src/test/java/com/redis/riot/stream/TestKafka.java @@ -37,8 +37,8 @@ import com.redis.lettucemod.api.async.RedisModulesAsyncCommands; import com.redis.lettucemod.api.sync.RedisModulesCommands; import com.redis.riot.AbstractRiotIntegrationTests; -import com.redis.testcontainers.junit.jupiter.RedisTestContext; -import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource; +import com.redis.testcontainers.junit.RedisTestContext; +import com.redis.testcontainers.junit.RedisTestContextsSource; import io.lettuce.core.LettuceFutures; import io.lettuce.core.Range; diff --git a/core/riot-core/riot-core.gradle b/core/riot-core/riot-core.gradle index fce1d2bb3..42fc57885 100644 --- a/core/riot-core/riot-core.gradle +++ b/core/riot-core/riot-core.gradle @@ -1,15 +1,12 @@ dependencies { + api 'org.slf4j:slf4j-api' api group: 'com.redis', name: 'lettucemod', version: lettucemodVersion api 'org.apache.commons:commons-pool2' api group: 'com.redis', name: 'spring-batch-redis', version: batchRedisVersion api 'org.springframework.batch:spring-batch-core' - api 'org.slf4j:slf4j-api' - implementation 'org.slf4j:slf4j-jdk14' api group: 'info.picocli', name: 'picocli', version: picocliVersion annotationProcessor group: 'info.picocli', name: 'picocli-codegen', version: picocliVersion implementation group: 'me.tongfei', name: 'progressbar', version: progressbarVersion implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' - api group: 'com.google.code.findbugs', name: 'jsr305', version: jsr305Version - testAnnotationProcessor group: 'info.picocli', name: 'picocli-codegen', version: picocliVersion - testImplementation "org.junit.jupiter:junit-jupiter" + implementation 'org.slf4j:slf4j-jdk14' } \ No newline at end of file diff --git a/core/riot-core/src/main/java/com/redis/riot/AbstractExportCommand.java b/core/riot-core/src/main/java/com/redis/riot/AbstractExportCommand.java index a6a6a9b7b..133979322 100644 --- a/core/riot-core/src/main/java/com/redis/riot/AbstractExportCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/AbstractExportCommand.java @@ -8,11 +8,11 @@ import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.support.DataStructure; -import picocli.CommandLine; +import picocli.CommandLine.ArgGroup; public abstract class AbstractExportCommand extends AbstractTransferCommand { - @CommandLine.ArgGroup(exclusive = false, heading = "Reader options%n") + @ArgGroup(exclusive = false, heading = "Reader options%n") private RedisReaderOptions options = new RedisReaderOptions(); protected AbstractTaskletStepBuilder, O>> step(String name, String taskName, diff --git a/core/riot-core/src/main/java/com/redis/riot/AbstractRiotCommand.java b/core/riot-core/src/main/java/com/redis/riot/AbstractRiotCommand.java index 305f761d8..aed866d70 100644 --- a/core/riot-core/src/main/java/com/redis/riot/AbstractRiotCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/AbstractRiotCommand.java @@ -1,19 +1,12 @@ package com.redis.riot; -import java.util.Arrays; -import java.util.Iterator; import java.util.concurrent.Callable; -import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobExecutionListener; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.job.flow.Flow; -import org.springframework.batch.core.job.flow.support.SimpleFlow; +import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.batch.core.step.builder.StepBuilder; -import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import com.redis.spring.batch.builder.JobRepositoryBuilder; @@ -56,24 +49,13 @@ protected StepBuilder step(String name) throws Exception { return getJobRunner().step(name); } - protected final Flow flow(String name, Step... steps) { - Assert.notEmpty(steps, "Steps are required"); - FlowBuilder flow = new FlowBuilder<>(name); - Iterator iterator = Arrays.asList(steps).iterator(); - flow.start(iterator.next()); - while (iterator.hasNext()) { - flow.next(iterator.next()); - } - return flow.build(); - } - @Override public Integer call() throws Exception { return exitCode(execute()); } private int exitCode(JobExecution execution) { - if (execution.getExitStatus().compareTo(ExitStatus.FAILED) >= 0) { + if (execution.getStatus().isUnsuccessful()) { return 1; } return 0; @@ -81,11 +63,18 @@ private int exitCode(JobExecution execution) { public JobExecution execute() throws Exception { JobRunner runner = getJobRunner(); - return runner.run(configureJob(runner.job(commandName())).start(flow()).build().build()); + return runner.run(job(configureJob(runner.job(commandName())))); } + protected abstract Job job(JobBuilder jobBuilder) throws Exception; + protected JobBuilder configureJob(JobBuilder job) { - return job.listener(new CleanupJobExecutionListener(getRedisOptions())); + return job.listener(new JobExecutionListenerSupport() { + @Override + public void afterJob(JobExecution jobExecution) { + getRedisOptions().shutdown(); + } + }); } private String commandName() { @@ -95,28 +84,6 @@ private String commandName() { return commandSpec.name(); } - protected static class CleanupJobExecutionListener implements JobExecutionListener { - - private final RedisOptions redisOptions; - - public CleanupJobExecutionListener(RedisOptions redisOptions) { - this.redisOptions = redisOptions; - } - - @Override - public void beforeJob(JobExecution jobExecution) { - // do nothing - } - - @Override - public void afterJob(JobExecution jobExecution) { - redisOptions.shutdown(); - } - - } - - protected abstract Flow flow() throws Exception; - @SuppressWarnings({ "unchecked", "rawtypes" }) protected B configureJobRepository(B builder) throws Exception { JobRunner runner = getJobRunner(); diff --git a/core/riot-core/src/main/java/com/redis/riot/ManifestVersionProvider.java b/core/riot-core/src/main/java/com/redis/riot/ManifestVersionProvider.java index 9fcfcc9fd..502ce0ddb 100644 --- a/core/riot-core/src/main/java/com/redis/riot/ManifestVersionProvider.java +++ b/core/riot-core/src/main/java/com/redis/riot/ManifestVersionProvider.java @@ -6,13 +6,13 @@ import java.util.jar.Attributes; import java.util.jar.Manifest; -import picocli.CommandLine; +import picocli.CommandLine.IVersionProvider; /** * {@link picocli.CommandLine.IVersionProvider} implementation that returns * version information from the jar file's {@code /META-INF/MANIFEST.MF} file. */ -public class ManifestVersionProvider implements CommandLine.IVersionProvider { +public class ManifestVersionProvider implements IVersionProvider { @Override public String[] getVersion() throws Exception { diff --git a/core/riot-core/src/main/java/com/redis/riot/RedisReaderOptions.java b/core/riot-core/src/main/java/com/redis/riot/RedisReaderOptions.java index f3dc00a0d..0fb0f16d5 100644 --- a/core/riot-core/src/main/java/com/redis/riot/RedisReaderOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/RedisReaderOptions.java @@ -114,7 +114,7 @@ public void setPoolMaxTotal(int poolMaxTotal) { @SuppressWarnings("rawtypes") public B configureScanReader(B builder) { - log.info("Configuring scan reader with {} {} {}", scanCount, scanMatch, scanType); + log.debug("Configuring scan reader with {} {} {}", scanCount, scanMatch, scanType); builder.match(scanMatch); builder.count(scanCount); if (scanType != null) { @@ -125,7 +125,7 @@ public B configureScanReader(B builder) { @SuppressWarnings({ "rawtypes", "unchecked" }) public B configureReader(B builder) { - log.info("Configuring reader with threads: {}, batch-size: {}, queue-capacity: {}", threads, batchSize, + log.debug("Configuring reader with threads: {}, batch-size: {}, queue-capacity: {}", threads, batchSize, queueCapacity); builder.threads(threads); builder.chunkSize(batchSize); @@ -137,22 +137,18 @@ public B configureReader(B builder) { public GenericObjectPoolConfig> poolConfig() { GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>(); config.setMaxTotal(poolMaxTotal); - log.info("Configuring reader with pool config {}", config); + log.debug("Configuring reader with pool config {}", config); return config; } - public ScanSizeEstimatorBuilder configureEstimator(ScanSizeEstimatorBuilder builder) { - builder.match(scanMatch).sampleSize(sampleSize); - if (scanType != null) { - builder.type(scanType.getName()); - } - return builder; - } - public Supplier initialMaxSupplier(ScanSizeEstimatorBuilder estimator) { return () -> { try { - return configureEstimator(estimator).build().call(); + estimator.match(scanMatch).sampleSize(sampleSize); + if (scanType != null) { + estimator.type(scanType.getName()); + } + return estimator.build().call(); } catch (Exception e) { log.warn("Could not estimate scan size", e); return null; diff --git a/core/riot-core/src/main/java/com/redis/riot/RiotApp.java b/core/riot-core/src/main/java/com/redis/riot/RiotApp.java index 51f684264..0034c9430 100644 --- a/core/riot-core/src/main/java/com/redis/riot/RiotApp.java +++ b/core/riot-core/src/main/java/com/redis/riot/RiotApp.java @@ -17,6 +17,8 @@ import picocli.CommandLine.Mixin; import picocli.CommandLine.Option; import picocli.CommandLine.ParseResult; +import picocli.CommandLine.RunFirst; +import picocli.CommandLine.RunLast; @Command(sortOptions = false, versionProvider = ManifestVersionProvider.class, subcommands = GenerateCompletionCommand.class, abbreviateSynopsis = true) public class RiotApp extends HelpCommand { @@ -40,12 +42,12 @@ public LoggingOptions getLoggingOptions() { private int executionStrategy(ParseResult parseResult) { configureLogging(); - return new CommandLine.RunLast().execute(parseResult); // default execution strategy + return new RunLast().execute(parseResult); // default execution strategy } private int executionStragegyRunFirst(ParseResult parseResult) { configureLogging(); - return new CommandLine.RunFirst().execute(parseResult); + return new RunFirst().execute(parseResult); } protected void configureLogging() { diff --git a/core/riot-core/src/main/java/com/redis/riot/RiotStepBuilder.java b/core/riot-core/src/main/java/com/redis/riot/RiotStepBuilder.java index 17351f9cd..bfee8b001 100644 --- a/core/riot-core/src/main/java/com/redis/riot/RiotStepBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/RiotStepBuilder.java @@ -24,6 +24,7 @@ import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.support.FlushingStepBuilder; +import com.redis.spring.batch.support.PollableItemReader; public class RiotStepBuilder { @@ -97,11 +98,7 @@ public FaultTolerantStepBuilder build() { } FaultTolerantStepBuilder ftStep = faultTolerant(step).skipPolicy(skipPolicy(options.getSkipPolicy())); if (options.getThreads() > 1) { - if (reader instanceof ItemStreamReader) { - SynchronizedItemStreamReader synchronizedReader = new SynchronizedItemStreamReader<>(); - synchronizedReader.setDelegate((ItemStreamReader) reader); - ftStep.reader(synchronizedReader); - } + ftStep.reader(synchronize(reader)); ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(options.getThreads()); taskExecutor.setMaxPoolSize(options.getThreads()); @@ -116,6 +113,20 @@ public FaultTolerantStepBuilder build() { return ftStep; } + private ItemReader synchronize(ItemReader reader) { + if (reader instanceof PollableItemReader) { + SynchronizedPollableItemReader pollableReader = new SynchronizedPollableItemReader<>(); + pollableReader.setDelegate((PollableItemReader) reader); + return pollableReader; + } + if (reader instanceof ItemStreamReader) { + SynchronizedItemStreamReader streamReader = new SynchronizedItemStreamReader<>(); + streamReader.setDelegate((ItemStreamReader) reader); + return streamReader; + } + return reader; + } + private FaultTolerantStepBuilder faultTolerant(SimpleStepBuilder step) { if (flushingOptions == null) { return step.faultTolerant(); diff --git a/core/riot-core/src/main/java/com/redis/riot/SynchronizedPollableItemReader.java b/core/riot-core/src/main/java/com/redis/riot/SynchronizedPollableItemReader.java new file mode 100644 index 000000000..903ca0027 --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/SynchronizedPollableItemReader.java @@ -0,0 +1,53 @@ +package com.redis.riot; + +import java.util.concurrent.TimeUnit; + +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.NonTransientResourceException; +import org.springframework.batch.item.ParseException; +import org.springframework.batch.item.UnexpectedInputException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import com.redis.spring.batch.support.PollableItemReader; + +public class SynchronizedPollableItemReader implements PollableItemReader, InitializingBean { + + private PollableItemReader delegate; + + /** + * This delegates to the read method of the delegate + */ + @Nullable + public synchronized T read() + throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { + return this.delegate.read(); + } + + public void close() { + this.delegate.close(); + } + + public void open(ExecutionContext executionContext) { + this.delegate.open(executionContext); + } + + public void update(ExecutionContext executionContext) { + this.delegate.update(executionContext); + } + + @Override + public void afterPropertiesSet() throws Exception { + Assert.notNull(this.delegate, "A delegate item reader is required"); + } + + public void setDelegate(PollableItemReader delegate) { + this.delegate = delegate; + } + + @Override + public T poll(long timeout, TimeUnit unit) throws Exception { + return delegate.poll(timeout, unit); + } +} diff --git a/core/riot-test/riot-test.gradle b/core/riot-test/riot-test.gradle index 95f7e9486..f6e102830 100644 --- a/core/riot-test/riot-test.gradle +++ b/core/riot-test/riot-test.gradle @@ -1,8 +1,6 @@ dependencies { implementation project(':riot-core') - implementation group: 'org.codehaus.plexus', name: 'plexus-utils', version:plexusVersion - annotationProcessor group: 'info.picocli', name: 'picocli-codegen', version: picocliVersion - api group: 'com.redis.testcontainers', name: 'testcontainers-redis-junit-jupiter', version: testcontainersRedisVersion - api group: 'org.awaitility', name: 'awaitility', version: awaitilityVersion implementation group: 'commons-io', name: 'commons-io', version: commonsIoVersion -} + implementation group: 'org.codehaus.plexus', name: 'plexus-utils', version:plexusVersion + api group: 'com.redis.testcontainers', name: 'testcontainers-redis-junit', version: testcontainersRedisVersion +} \ No newline at end of file diff --git a/core/riot-test/src/main/java/com/redis/riot/AbstractRiotIntegrationTests.java b/core/riot-test/src/main/java/com/redis/riot/AbstractRiotIntegrationTests.java index 188c5a305..d2e8fad92 100644 --- a/core/riot-test/src/main/java/com/redis/riot/AbstractRiotIntegrationTests.java +++ b/core/riot-test/src/main/java/com/redis/riot/AbstractRiotIntegrationTests.java @@ -14,7 +14,7 @@ import com.redis.testcontainers.RedisClusterContainer; import com.redis.testcontainers.RedisContainer; import com.redis.testcontainers.RedisServer; -import com.redis.testcontainers.junit.jupiter.RedisTestContext; +import com.redis.testcontainers.junit.RedisTestContext; @Testcontainers public abstract class AbstractRiotIntegrationTests extends AbstractRiotTests { diff --git a/core/riot-test/src/main/java/com/redis/riot/AbstractRiotTests.java b/core/riot-test/src/main/java/com/redis/riot/AbstractRiotTests.java index 95356fdbf..92487bdc6 100644 --- a/core/riot-test/src/main/java/com/redis/riot/AbstractRiotTests.java +++ b/core/riot-test/src/main/java/com/redis/riot/AbstractRiotTests.java @@ -12,8 +12,8 @@ import org.springframework.batch.core.JobExecution; import com.redis.spring.batch.support.generator.Generator.GeneratorBuilder; -import com.redis.testcontainers.junit.jupiter.AbstractTestcontainersRedisTestBase; -import com.redis.testcontainers.junit.jupiter.RedisTestContext; +import com.redis.testcontainers.junit.AbstractTestcontainersRedisTestBase; +import com.redis.testcontainers.junit.RedisTestContext; import io.lettuce.core.RedisURI; import picocli.CommandLine; diff --git a/core/riot-core/src/test/java/com/redis/riot/convert/IdConverterBuilderTests.java b/core/riot-test/src/main/java/com/redis/riot/convert/IdConverterBuilderTests.java similarity index 100% rename from core/riot-core/src/test/java/com/redis/riot/convert/IdConverterBuilderTests.java rename to core/riot-test/src/main/java/com/redis/riot/convert/IdConverterBuilderTests.java diff --git a/gradle.properties b/gradle.properties index 0c5d50fae..d440c4c02 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group=com.redis -bootPluginVersion=2.6.1 +bootPluginVersion=2.6.2 dependencyPluginVersion=1.0.11.RELEASE kordampPluginVersion=0.47.0 kordampBuildVersion=2.6.0 @@ -8,22 +8,22 @@ kordampBuildVersion=2.6.0 avroVersion=1.11.0 awaitilityVersion=4.1.1 awsVersion=2.2.6.RELEASE -batchRedisVersion=2.23.9 +batchRedisVersion=2.24.0 commonsIoVersion=2.11.0 -db2Version=11.5.6.0 +db2Version=11.5.7.0 fakerVersion=1.0.2 gcpVersion=1.2.8.RELEASE googleHttpVersion=1.38.0 jsr305Version=3.0.2 -kafkaVersion=7.0.0 +kafkaVersion=7.0.1 latencyutilsVersion=2.0.3 lettucemodVersion=1.8.1 -mssqlVersion=9.4.0.jre8 +mssqlVersion=9.4.1.jre8 oracleVersion=19.3.0.0 picocliVersion=4.6.2 plexusVersion=3.4.1 progressbarVersion=0.9.2 protobufVersion=3.14.0 sqliteVersion=3.36.0.3 -testcontainersRedisVersion=1.4.6 +testcontainersRedisVersion=1.4.7 testcontainersVersion=1.15.3 \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e750102e0..2e6e5897b 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.3-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists