Skip to content

Commit

Permalink
Added synchronized PollableItemReader. Fixes #81
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Dec 30, 2021
1 parent f1b26ab commit 64000e0
Show file tree
Hide file tree
Showing 38 changed files with 331 additions and 268 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.14.3
2.14.4-SNAPSHOT
14 changes: 4 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,12 @@ subprojects {
license {
exclude('build/**')
}

dependencies {
testImplementation "org.junit.jupiter:junit-jupiter"
testImplementation "org.junit.jupiter:junit-jupiter-params"
testImplementation group: 'commons-io', name: 'commons-io', version: commonsIoVersion
testImplementation group: 'org.codehaus.plexus', name: 'plexus-utils', version:plexusVersion
testImplementation group: 'com.redis.testcontainers', name: 'testcontainers-redis', version: testcontainersRedisVersion
testImplementation group: 'com.redis.testcontainers', name: 'junit-jupiter', version: testcontainersVersion

testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine"
compileOnly group: 'com.google.code.findbugs', name: 'jsr305', version: jsr305Version
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

test {
useJUnitPlatform()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;

Expand Down Expand Up @@ -41,12 +42,12 @@ public DataSourceOptions getDataSourceOptions() {
}

@Override
protected Flow flow() throws Exception {
log.info("Creating data source: {}", dataSourceOptions);
protected Job job(JobBuilder jobBuilder) throws Exception {
log.debug("Creating data source with {}", dataSourceOptions);
DataSource dataSource = dataSourceOptions.dataSource();
try (Connection connection = dataSource.getConnection()) {
String dbName = connection.getMetaData().getDatabaseProductName();
log.info("Creating {} database writer: {}", dbName, exportOptions);
log.debug("Creating writer for database {} with {}", dbName, exportOptions);
JdbcBatchItemWriterBuilder<Map<String, Object>> builder = new JdbcBatchItemWriterBuilder<>();
builder.itemSqlParameterSourceProvider(NullableMapSqlParameterSource::new);
builder.dataSource(dataSource);
Expand All @@ -55,7 +56,8 @@ protected Flow flow() throws Exception {
JdbcBatchItemWriter<Map<String, Object>> writer = builder.build();
writer.afterPropertiesSet();
DataStructureItemProcessor processor = DataStructureItemProcessor.of(exportOptions.getKeyRegex());
return flow(NAME, step(NAME, String.format("Exporting to %s", dbName), processor, writer).build());
return jobBuilder.start(step(NAME, String.format("Exporting to %s", dbName), processor, writer).build())
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.jdbc.core.ColumnMapRowMapper;
Expand Down Expand Up @@ -37,7 +38,7 @@ public DataSourceOptions getDataSourceOptions() {
}

@Override
protected Flow flow() throws Exception {
protected Job job(JobBuilder jobBuilder) throws Exception {
log.debug("Creating data source: {}", dataSourceOptions);
DataSource dataSource = dataSourceOptions.dataSource();
try (Connection connection = dataSource.getConnection()) {
Expand All @@ -62,7 +63,7 @@ protected Flow flow() throws Exception {
builder.verifyCursorPosition(importOptions.isVerifyCursorPosition());
JdbcCursorItemReader<Map<String, Object>> reader = builder.build();
reader.afterPropertiesSet();
return flow(NAME, step(NAME, "Importing from " + name, reader).build());
return jobBuilder.start(step(NAME, "Importing from " + name, reader).build()).build();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.redis.riot.db;

import com.redis.riot.AbstractRiotIntegrationTests;
import javax.sql.DataSource;

import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.testcontainers.containers.JdbcDatabaseContainer;
import picocli.CommandLine;

import javax.sql.DataSource;
import com.redis.riot.AbstractRiotIntegrationTests;

import picocli.CommandLine;

public abstract class AbstractDatabaseTests extends AbstractRiotIntegrationTests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import com.fasterxml.jackson.databind.ObjectReader;
import com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.spring.batch.support.generator.Generator.Type;
import com.redis.testcontainers.junit.jupiter.RedisTestContext;
import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource;
import com.redis.testcontainers.junit.RedisTestContext;
import com.redis.testcontainers.junit.RedisTestContextsSource;

@Testcontainers
@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
Expand Down Expand Up @@ -63,23 +66,28 @@ public RedisWriterOptions getWriterOptions() {
}

@Override
protected Flow flow() throws Exception {
protected Job job(JobBuilder jobBuilder) throws Exception {
Assert.isTrue(!ObjectUtils.isEmpty(files), "No file specified");
List<String> expandedFiles = FileUtils.expand(files);
if (ObjectUtils.isEmpty(expandedFiles)) {
throw new FileNotFoundException("File not found: " + String.join(", ", files));
}
List<Step> steps = new ArrayList<>();
for (String file : expandedFiles) {
DumpFileType fileType = fileType(file);
Resource resource = options.inputResource(file);
AbstractItemStreamItemReader<DataStructure<String>> reader = reader(fileType, resource);
reader.setName(file + "-" + NAME + "-reader");
RiotStepBuilder<DataStructure<String>, DataStructure<String>> step = riotStep(file + "-" + NAME,
"Importing " + file);
steps.add(step.reader(reader).processor(this::processDataStructure).writer(writer()).build().build());
Iterator<String> fileIterator = expandedFiles.iterator();
SimpleJobBuilder simpleJobBuilder = jobBuilder.start(fileImportStep(fileIterator.next()));
while (fileIterator.hasNext()) {
simpleJobBuilder.next(fileImportStep(fileIterator.next()));
}
return flow(NAME, steps.toArray(new Step[0]));
return simpleJobBuilder.build();
}

private TaskletStep fileImportStep(String file) throws Exception {
DumpFileType fileType = fileType(file);
Resource resource = options.inputResource(file);
AbstractItemStreamItemReader<DataStructure<String>> reader = reader(fileType, resource);
reader.setName(file + "-" + NAME + "-reader");
RiotStepBuilder<DataStructure<String>, DataStructure<String>> step = riotStep(file + "-" + NAME,
"Importing " + file);
return step.reader(reader).processor(this::processDataStructure).writer(writer()).build().build();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonObjectMarshaller;
Expand Down Expand Up @@ -42,10 +43,10 @@ public FileExportOptions getOptions() {
}

@Override
protected Flow flow() throws Exception {
protected Job job(JobBuilder jobBuilder) throws Exception {
WritableResource resource = options.outputResource(file);
String taskName = String.format("Exporting %s", resource.getFilename());
return flow(NAME, step(NAME, taskName, writer(resource)).build());
return jobBuilder.start(step(NAME, taskName, writer(resource)).build()).build();
}

private ItemWriter<DataStructure<String>> writer(WritableResource resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileParseException;
import org.springframework.batch.item.file.LineCallbackHandler;
Expand Down Expand Up @@ -78,27 +80,30 @@ public FileImportOptions getOptions() {
}

@Override
protected Flow flow() throws Exception {
protected Job job(JobBuilder jobBuilder) throws Exception {
Assert.isTrue(!ObjectUtils.isEmpty(files), "No file specified");
List<String> expandedFiles = FileUtils.expand(files);
if (ObjectUtils.isEmpty(expandedFiles)) {
throw new FileNotFoundException("File not found: " + String.join(", ", files));
}
List<Step> steps = new ArrayList<>();
for (String file : expandedFiles) {
FileType fileType = type(file);
if (fileType == null) {
throw new IllegalArgumentException("Could not determine type of file " + file);
}
Resource resource = options.inputResource(file);
AbstractItemStreamItemReader<Map<String, Object>> reader = reader(file, fileType, resource);
reader.setName(file + "-" + NAME + "-reader");
FaultTolerantStepBuilder<Map<String, Object>, Map<String, Object>> step = step(file + "-" + NAME,
"Importing " + resource.getFilename(), reader);
step.skip(FlatFileParseException.class);
steps.add(step.build());
Iterator<String> fileIterator = expandedFiles.iterator();
SimpleJobBuilder simpleJobBuilder = jobBuilder.start(fileImportStep(fileIterator.next()));
while (fileIterator.hasNext()) {
simpleJobBuilder.next(fileImportStep(fileIterator.next()));
}
return simpleJobBuilder.build();
}

private TaskletStep fileImportStep(String file) throws Exception {
FileType fileType = type(file);
if (fileType == null) {
throw new IllegalArgumentException("Could not determine type of file " + file);
}
return flow(NAME, steps.toArray(new Step[0]));
Resource resource = options.inputResource(file);
AbstractItemStreamItemReader<Map<String, Object>> reader = reader(file, fileType, resource);
reader.setName(file + "-" + NAME + "-reader");
return step(file + "-" + NAME, "Importing " + resource.getFilename(), reader).skip(FlatFileParseException.class)
.build();
}

private FileType type(String file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.redis.riot.file.RiotFile;
import com.redis.testcontainers.RedisModulesContainer;
import com.redis.testcontainers.RedisServer;
import com.redis.testcontainers.junit.jupiter.RedisTestContext;
import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource;
import com.redis.testcontainers.junit.RedisTestContext;
import com.redis.testcontainers.junit.RedisTestContextsSource;

@SuppressWarnings("unchecked")
@Testcontainers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import com.redis.riot.file.resource.XmlObjectReader;
import com.redis.riot.redis.HsetCommand;
import com.redis.spring.batch.support.DataStructure;
import com.redis.testcontainers.junit.jupiter.RedisTestContext;
import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource;
import com.redis.testcontainers.junit.RedisTestContext;
import com.redis.testcontainers.junit.RedisTestContextsSource;

import io.lettuce.core.GeoArgs;
import io.lettuce.core.api.sync.RedisGeoCommands;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemReader;

import com.redis.lettucemod.RedisModulesUtils;
Expand All @@ -30,8 +32,9 @@ public class GeneratorImportCommand extends AbstractImportCommand {
private GenerateOptions options = new GenerateOptions();

@Override
protected Flow flow() throws Exception {
return flow(NAME, step(NAME, "Generating", reader()).build());
protected Job job(JobBuilder jobBuilder) throws Exception {
TaskletStep step = step(NAME, "Generating", reader()).build();
return jobBuilder.start(step).build();
}

private ItemReader<Map<String, Object>> reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import com.redis.lettucemod.search.Document;
import com.redis.lettucemod.search.Field;
import com.redis.lettucemod.search.Field.TextField.PhoneticMatcher;
import com.redis.lettucemod.search.SearchResults;
import com.redis.riot.AbstractRiotIntegrationTests;
import com.redis.lettucemod.search.SearchResults;
import com.redis.testcontainers.RedisModulesContainer;
import com.redis.testcontainers.RedisServer;
import com.redis.testcontainers.junit.jupiter.RedisTestContext;
import com.redis.testcontainers.junit.jupiter.RedisTestContextsSource;
import com.redis.testcontainers.junit.RedisTestContext;
import com.redis.testcontainers.junit.RedisTestContextsSource;

import io.lettuce.core.Range;
import io.lettuce.core.StreamMessage;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.redis.riot.redis;

import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.repeat.RepeatStatus;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
Expand All @@ -13,15 +15,15 @@
public abstract class AbstractRedisCommandCommand extends AbstractRiotCommand {

@Override
protected Flow flow() throws Exception {
protected Job job(JobBuilder jobBuilder) throws Exception {
String name = name();
return flow(name, step(name).tasklet((contribution, chunkContext) -> {
Step step = step(name).tasklet((contribution, chunkContext) -> {
try (StatefulRedisModulesConnection<String, String> connection = getRedisOptions().connect()) {
RedisModulesCommands<String, String> commands = connection.sync();
execute(commands);
execute(connection.sync());
return RepeatStatus.FINISHED;
}
}).build());
}).build();
return jobBuilder.start(step).build();
}

protected abstract void execute(RedisModulesCommands<String, String> commands) throws InterruptedException;
Expand Down
Loading

0 comments on commit 64000e0

Please sign in to comment.