Skip to content

Commit

Permalink
refactor!: Introduced readers() method
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed Jul 11, 2022
1 parent a7d83d1 commit a6e0064
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;

import org.slf4j.Logger;
Expand All @@ -16,6 +18,7 @@
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileParseException;
import org.springframework.batch.item.file.LineCallbackHandler;
Expand Down Expand Up @@ -82,65 +85,81 @@ public FileImportOptions getOptions() {

@Override
protected Job job(JobBuilder jobBuilder) throws Exception {
Iterator<TaskletStep> iterator = fileImportSteps().iterator();
SimpleJobBuilder simpleJobBuilder = jobBuilder.start(iterator.next());
while (iterator.hasNext()) {
simpleJobBuilder.next(iterator.next());
}
return simpleJobBuilder.build();
}

private List<TaskletStep> fileImportSteps() throws Exception {
Map<Resource, ItemReader<Map<String, Object>>> readers = readers();
List<TaskletStep> steps = new ArrayList<>();
for (Entry<Resource, ItemReader<Map<String, Object>>> entry : readers.entrySet()) {
Resource resource = entry.getKey();
ItemReader<Map<String, Object>> reader = entry.getValue();
steps.add(step(resource.getDescription() + "-" + NAME, "Importing " + resource.getFilename(), reader)
.skip(FlatFileParseException.class).build());
}
return steps;
}

public Map<Resource, ItemReader<Map<String, Object>>> readers() throws IOException {
Assert.isTrue(!ObjectUtils.isEmpty(files), "No file specified");
List<String> expandedFiles = FileUtils.expand(files);
if (ObjectUtils.isEmpty(expandedFiles)) {
throw new FileNotFoundException("File not found: " + String.join(", ", files));
}
Iterator<String> fileIterator = expandedFiles.iterator();
SimpleJobBuilder simpleJobBuilder = jobBuilder.start(fileImportStep(fileIterator.next()));
while (fileIterator.hasNext()) {
simpleJobBuilder.next(fileImportStep(fileIterator.next()));
Map<Resource, ItemReader<Map<String, Object>>> readers = new LinkedHashMap<>();
for (String file : expandedFiles) {
Resource resource = options.inputResource(file);
AbstractItemStreamItemReader<Map<String, Object>> reader = reader(resource);
reader.setName(file + "-" + NAME + "-reader");
readers.put(resource, reader);
}
return simpleJobBuilder.build();
}

private TaskletStep fileImportStep(String file) throws Exception {
AbstractItemStreamItemReader<Map<String, Object>> reader = reader(file);
reader.setName(file + "-" + NAME + "-reader");
return step(file + "-" + NAME, "Importing " + file, reader).skip(FlatFileParseException.class).build();
return readers;
}

private Optional<FileType> type(String file) {
private Optional<FileType> type(Optional<String> extension) {
if (type.isPresent()) {
return type;
}
Optional<String> extension = FileUtils.extension(file);
if (extension.isPresent()) {
switch (extension.get().toLowerCase()) {
case FileUtils.EXTENSION_FW:
return Optional.of(FileType.FIXED);
case FileUtils.EXTENSION_JSON:
return Optional.of(FileType.JSON);
case FileUtils.EXTENSION_XML:
return Optional.of(FileType.XML);
case FileUtils.EXTENSION_CSV:
case FileUtils.EXTENSION_PSV:
case FileUtils.EXTENSION_TSV:
return Optional.of(FileType.DELIMITED);
default:
return Optional.empty();
}
if (extension.isEmpty()) {
return Optional.empty();
}
switch (extension.get().toLowerCase()) {
case FileUtils.EXTENSION_FW:
return Optional.of(FileType.FIXED);
case FileUtils.EXTENSION_JSON:
return Optional.of(FileType.JSON);
case FileUtils.EXTENSION_XML:
return Optional.of(FileType.XML);
case FileUtils.EXTENSION_CSV:
case FileUtils.EXTENSION_PSV:
case FileUtils.EXTENSION_TSV:
return Optional.of(FileType.DELIMITED);
default:
return Optional.empty();
}
return Optional.empty();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public AbstractItemStreamItemReader<Map<String, Object>> reader(String file) throws IOException {
Optional<FileType> fileType = type(file);
public AbstractItemStreamItemReader<Map<String, Object>> reader(Resource resource) {
Optional<String> extension = FileUtils.extension(resource.getFilename());
Optional<FileType> fileType = type(extension);
if (fileType.isEmpty()) {
throw new IllegalArgumentException("Could not determine type of file " + file);
throw new IllegalArgumentException("Could not determine file type for " + resource);
}
Resource resource = options.inputResource(file);
switch (fileType.get()) {
case DELIMITED:
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setDelimiter(options.delimiter(file));
tokenizer.setDelimiter(options.delimiter(extension));
tokenizer.setQuoteCharacter(options.getQuoteCharacter());
if (!ObjectUtils.isEmpty(options.getIncludedFields())) {
tokenizer.setIncludedFields(options.getIncludedFields());
}
log.debug("Creating delimited reader with {} for file {}", options, file);
log.debug("Creating delimited reader with {} for {}", options, resource);
return flatFileReader(resource, tokenizer);
case FIXED:
FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer();
Expand All @@ -153,13 +172,13 @@ public AbstractItemStreamItemReader<Map<String, Object>> reader(String file) thr
"Invalid ranges specified: " + Arrays.toString(options.getColumnRanges()));
}
fixedLengthTokenizer.setColumns(ranges);
log.debug("Creating fixed-width reader with {} for file {}", options, file);
log.debug("Creating fixed-width reader with {} for {}", options, resource);
return flatFileReader(resource, fixedLengthTokenizer);
case XML:
log.debug("Creating XML reader for file {}", file);
log.debug("Creating XML reader for {}", resource);
return (XmlItemReader) FileUtils.xmlReader(resource, Map.class);
default:
log.debug("Creating JSON reader for file {}", file);
log.debug("Creating JSON reader for {}", resource);
return (JsonItemReader) FileUtils.jsonReader(resource, Map.class);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ public int getLinesToSkip() {
return 0;
}

public String delimiter(String file) {
public String delimiter(Optional<String> extension) {
if (delimiter.isPresent()) {
return delimiter.get();
}
Optional<String> extension = FileUtils.extension(file);
if (extension.isEmpty()) {
throw new IllegalArgumentException("Could not determine delimiter for extension " + extension);
}
Expand Down

0 comments on commit a6e0064

Please sign in to comment.