Skip to content

Commit

Permalink
FEAT : Ajout multi thread
Browse files Browse the repository at this point in the history
Ajout gestion des erreurs et envoi dans topic
Ajout envoi nombre de lignes du fichier dans topic
  • Loading branch information
pierre-maraval committed Nov 22, 2023
1 parent de07647 commit 1b1af39
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 149 deletions.
6 changes: 2 additions & 4 deletions src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.io.BufferedReader;
Expand Down Expand Up @@ -55,14 +56,11 @@ public void run(String... args) throws IOException {
} catch (IllegalFileFormatException | IllegalProviderException e) {
throw new RuntimeException(e);
}
// Calcul du nombre total de ligne
BufferedReader kbartTotalLines = new BufferedReader(new FileReader(tsvFile));
service.loadFile(tsvFile, kbartHeader, kbartTotalLines.lines().count() - 1);
service.loadFile(tsvFile, kbartHeader);
}
long endTime = System.currentTimeMillis();
double executionTime = (double) (endTime - startTime) / 1000;
log.info("Temps d'exécution : " + executionTime + " secondes");

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
@EnableTransactionManagement
public class KafkaConfig {

@Value("${spring.kafka.producer.bootstrap-servers}")
Expand All @@ -37,9 +35,6 @@ public Map<String, Object> producerConfigs() {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix);
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put(ProducerConfig.LINGER_MS_CONFIG, "20");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(512*1024));
return props;
}

Expand Down
7 changes: 1 addition & 6 deletions src/main/java/fr/abes/kbart2kafka/dto/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ public class Header {

private String fileName;

private long totalNumberOfLine;

private int currentLine;

public Header(String fileName, long totalNumberOfLine) {
public Header(String fileName) {
this.fileName = fileName;
this.totalNumberOfLine = totalNumberOfLine;
}
}
90 changes: 0 additions & 90 deletions src/main/java/fr/abes/kbart2kafka/kafka/TopicProducer.java

This file was deleted.

112 changes: 74 additions & 38 deletions src/main/java/fr/abes/kbart2kafka/service/FileService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.kbart2kafka.dto.Header;
import fr.abes.kbart2kafka.dto.LigneKbartDto;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand All @@ -24,10 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;

@Service
@Slf4j
Expand All @@ -36,65 +34,103 @@ public class FileService {
@Value("${topic.name.target.kbart}")
private String topicKbart;

@Value("${topic.name.target.nblines}")
private String topicNbLines;

@Value("${topic.name.target.errors}")
private String topicErrors;

@Value("${spring.kafka.producer.nbthread}")
private int nbThread;
private final KafkaTemplate<String, String> kafkaTemplate;

private final ObjectMapper mapper;
ExecutorService executor = Executors.newFixedThreadPool(5);
ExecutorService executor;

public FileService(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper mapper) {
this.kafkaTemplate = kafkaTemplate;
this.mapper = mapper;
}

@PostConstruct
void initExecutor() {
executor = Executors.newFixedThreadPool(nbThread);
}

@Transactional
public void loadFile(File fichier, String kbartHeader, long totalNumberOfLine) {
public void loadFile(File fichier, String kbartHeader) {
try {
executeMultiThread(fichier, kbartHeader, totalNumberOfLine);
executeMultiThread(fichier, kbartHeader);
} catch (IOException ex) {
log.error("Erreur dans la lecture du fichier");
}

}

private void executeMultiThread(File fichier, String kbartHeader, long totalNumberOfLine) throws IOException {
private void executeMultiThread(File fichier, String kbartHeader) throws IOException {
// Compteur du nombre de lignes dans le kbart
int lineCounter = 0;
// Création du header et ajout du nombre total de lignes
Header kafkaHeader = new Header(fichier.getName(), totalNumberOfLine);
BufferedReader buff = new BufferedReader(new FileReader(fichier));
for (String ligneKbart : buff.lines().toList()) {
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;
// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

// Envoi de la ligne kbart dans le producer
kafkaHeader.setCurrentLine(lineCounter);
executor.execute(() -> {
try {
List<org.apache.kafka.common.header.Header> headers = new ArrayList<>();
headers.add(new RecordHeader("FileName", kafkaHeader.getFileName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("CurrentLine", String.valueOf(kafkaHeader.getCurrentLine()).getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("TotalLine", String.valueOf(kafkaHeader.getTotalNumberOfLine()).getBytes(StandardCharsets.UTF_8)));
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(5), "", mapper.writeValueAsString(ligneKbartDto), headers);
CompletableFuture<SendResult<String, String>> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record));
log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
} catch (JsonProcessingException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
});
Header kafkaHeader = new Header(fichier.getName());
try (BufferedReader buff = new BufferedReader(new FileReader(fichier))) {
for (String ligneKbart : buff.lines().toList()) {
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;
// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

final int finalLineCounter = lineCounter;
executor.execute(() -> {
try {
List<org.apache.kafka.common.header.Header> headers = new ArrayList<>();
headers.add(new RecordHeader("FileName", kafkaHeader.getFileName().getBytes(StandardCharsets.UTF_8)));
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(nbThread), "", mapper.writeValueAsString(ligneKbartDto), headers);
CompletableFuture<SendResult<String, String>> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record));
result.whenComplete((sr, ex) -> {
try {
log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
} catch (JsonProcessingException | InterruptedException | ExecutionException e) {
sendErrorToKafka("Erreur dans le chargement à la ligne " + finalLineCounter, e, kafkaHeader);
throw new RuntimeException(e);
}
});
} catch (JsonProcessingException e) {
sendErrorToKafka("erreur de mapping des données au chargement de la ligne " + finalLineCounter, e, kafkaHeader);
throw new RuntimeException(e);
}
});

}
}

} catch (IOException ex) {
sendErrorToKafka("erreur de lecture du fichier", ex, kafkaHeader);
} finally {
executor.shutdown();
}
try {
executor.awaitTermination(1, TimeUnit.HOURS);
log.info("envoi nb lignes");
Message<String> message = MessageBuilder
.withPayload(String.valueOf(lineCounter))
.setHeader(KafkaHeaders.TOPIC, topicNbLines)
.setHeader("FileName", kafkaHeader.getFileName())
.build();
kafkaTemplate.send(message);
} catch (InterruptedException e) {
sendErrorToKafka("Erreur dans l'écriture du nombre de lignes dans le topic", e, kafkaHeader);
throw new RuntimeException(e);
}
executor.shutdown();
log.info("fin de boucle");
}

private void sendErrorToKafka(String errorMessage, Exception exception, Header kafkaHeader) {
Message<String> message = MessageBuilder
.withPayload("OK")
.setHeader(KafkaHeaders.TOPIC, topicKbart)
.withPayload(errorMessage + exception.getMessage())
.setHeader(KafkaHeaders.TOPIC, topicErrors)
.setHeader("FileName", kafkaHeader.getFileName())
.setHeader("CurrentLine", kafkaHeader.getCurrentLine())
.setHeader("TotalLine", kafkaHeader.getTotalNumberOfLine())
.build();

kafkaTemplate.send(message);
}

Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.producer.nbthread=

logging.level.root=INFO
logging.level.fr.abes=DEBUG
1 change: 1 addition & 0 deletions src/main/resources/application-prod.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.producer.nbthread=

logging.level.root=INFO
logging.level.fr.abes=ERROR
3 changes: 2 additions & 1 deletion src/main/resources/application-test.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Producer properties
spring.kafka.producer.bootstrap-servers=
spring.kafka.producer.nbthread=

logging.level.root=INFO
logging.level.fr.abes=DEBUG
logging.level.fr.abes=INFO
7 changes: 2 additions & 5 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.producer.transaction-timeout=1800000

topic.name.target.kbart=bacon.kbart.toload


spring.jpa.open-in-view=false

spring.mvc.pathmatch.matching-strategy=ant_path_matcher
topic.name.target.nbLines=bacon.kbart.toload.nbLines
topic.name.target.errors=bacon.kbart.toload.errors

# Header d'un fichier kbart
kbart.header=publication_title\tprint_identifier\tonline_identifier\tdate_first_issue_online\tnum_first_vol_online\tnum_first_issue_online\tdate_last_issue_online\tnum_last_vol_online\tnum_last_issue_online\ttitle_url\tfirst_author\ttitle_id\tembargo_info\tcoverage_depth\tnotes\tpublisher_name\tpublication_type\tdate_monograph_published_print\tdate_monograph_published_online\tmonograph_volume\tmonograph_edition\tfirst_editor\tparent_publication_title_id\tpreceding_publication_title_id\taccess_type

0 comments on commit 1b1af39

Please sign in to comment.