Skip to content

Commit

Permalink
FEAT : Tests multi threading
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre-maraval committed Nov 22, 2023
1 parent 6d71d74 commit de07647
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 132 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

<!-- === Mapping === -->
<dependency>
<groupId>org.modelmapper</groupId>
Expand Down
144 changes: 17 additions & 127 deletions src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java
Original file line number Diff line number Diff line change
@@ -1,54 +1,31 @@
package fr.abes.kbart2kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.kbart2kafka.dto.Header;
import fr.abes.kbart2kafka.dto.LigneKbartDto;
import fr.abes.kbart2kafka.exception.IllegalFileFormatException;
import fr.abes.kbart2kafka.exception.IllegalProviderException;
import fr.abes.kbart2kafka.kafka.TopicProducer;
import fr.abes.kbart2kafka.service.FileService;
import fr.abes.kbart2kafka.utils.CheckFiles;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@EnableKafka
@SpringBootApplication
public class Kbart2kafkaApplication implements CommandLineRunner {

@Value("${kbart.header}")
private String kbartHeader;
private final FileService service;

private final TopicProducer topicProducer;

private final ObjectMapper mapper;

private final KafkaTransactionManager kafkaTransactionManager;

ExecutorService executor = Executors.newFixedThreadPool(5);

public Kbart2kafkaApplication(TopicProducer topicProducer, ObjectMapper mapper, KafkaTransactionManager kafkaTransactionManager) {
this.topicProducer = topicProducer;
this.mapper = mapper;
this.kafkaTransactionManager = kafkaTransactionManager;
public Kbart2kafkaApplication(FileService service) {
this.service = service;
}

public static void main(String[] args) {
Expand All @@ -62,118 +39,31 @@ public static void main(String[] args) {
* @throws IOException Exception levée lorsque aucun fichier tsv n'a été trouvé.
*/
@Override
@Transactional
// on spécifie la class qui fait rollback, par defaut c'est toutes les classes qui ne sont pas gérées càd : tout sauf IOException
public void run(String... args) throws IOException {

long startTime = System.currentTimeMillis();
// Contrôle de la présence d'un paramètre au lancement de Kbart2kafkaApplication
if (args.length == 0 || args[0] == null || args[0].trim().isEmpty()) {
log.error("Message envoyé : {}", "Le chemin d'accès au fichier tsv n'a pas été trouvé dans les paramètres de l'application");
} else {
log.info("Debut envois kafka de : " + args[0]);
// Récupération du chemin d'accès au fichier
File tsvFile = new File(args[0]);
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus status = kafkaTransactionManager.getTransaction(def);
log.info(String.valueOf(status.isNewTransaction()));
// Appelle du service de vérification de fichier
try {
// Appelle du service de vérification de fichier
CheckFiles.verifyFile(tsvFile, kbartHeader);

// Calcul du nombre total de ligne
Scanner kbartTotalLines = new Scanner(tsvFile);
int totalNumberOfLine = 0;
while (kbartTotalLines.hasNextLine()) {
String ligneKbart = kbartTotalLines.nextLine();
if (!ligneKbart.contains(kbartHeader)) {
totalNumberOfLine++;
}
}

// Compteur du nombre de lignes dans le kbart
Scanner kbart = new Scanner(tsvFile);
int lineCounter = 0;

// Création du header et ajout du nombre total de lignes
Header kafkaHeader = new Header(tsvFile.getName(), totalNumberOfLine);

while (kbart.hasNextLine()) {
String ligneKbart = kbart.nextLine();
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;

// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

// Envoi de la ligne kbart dans le producer
kafkaHeader.setCurrentLine(lineCounter);

executor.submit(() -> {
try {
topicProducer.sendLigneKbart(ligneKbartDto, kafkaHeader);
//log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
} catch (JsonProcessingException e) {
kafkaTransactionManager.rollback(status);
throw new RuntimeException(e);
}
});
}
}
// Envoi du message de fin de traitement dans le producer "OK"
kafkaTransactionManager.commit(status);
log.info(String.valueOf(status.isCompleted()));
} catch (IOException e) {
kafkaTransactionManager.rollback(status);
throw new IOException(e);
} catch (IllegalFileFormatException | IllegalProviderException e) {
kafkaTransactionManager.rollback(status);
throw new RuntimeException(e);
}
finally {
executor.shutdown();
}
// Calcul du nombre total de ligne
BufferedReader kbartTotalLines = new BufferedReader(new FileReader(tsvFile));
service.loadFile(tsvFile, kbartHeader, kbartTotalLines.lines().count() - 1);
}
}
long endTime = System.currentTimeMillis();
double executionTime = (double) (endTime - startTime) / 1000;
log.info("Temps d'exécution : " + executionTime + " secondes");

/**
* Construction de la dto
*
* @param line ligne en entrée
* @return Un objet DTO initialisé avec les informations de la ligne
*/
private LigneKbartDto constructDto(String[] line) {
LigneKbartDto kbartLineInDtoObject = new LigneKbartDto();
kbartLineInDtoObject.setPublication_title(line[0]);
kbartLineInDtoObject.setPrint_identifier(line[1]);
kbartLineInDtoObject.setOnline_identifier(line[2]);
kbartLineInDtoObject.setDate_first_issue_online(line[3]);
kbartLineInDtoObject.setNum_first_vol_online(Integer.getInteger(line[4]));
kbartLineInDtoObject.setNum_first_issue_online(Integer.getInteger(line[5]));
kbartLineInDtoObject.setDate_last_issue_online(line[6]);
kbartLineInDtoObject.setNum_last_vol_online(Integer.getInteger(line[7]));
kbartLineInDtoObject.setNum_last_issue_online(Integer.getInteger(line[8]));
kbartLineInDtoObject.setTitle_url(line[9]);
kbartLineInDtoObject.setFirst_author(line[10]);
kbartLineInDtoObject.setTitle_id(line[11]);
kbartLineInDtoObject.setEmbargo_info(line[12]);
kbartLineInDtoObject.setCoverage_depth(line[13]);
kbartLineInDtoObject.setNotes(line[14]);
kbartLineInDtoObject.setPublisher_name(line[15]);
kbartLineInDtoObject.setPublication_type(line[16]);
kbartLineInDtoObject.setDate_monograph_published_print(line[17]);
kbartLineInDtoObject.setDate_monograph_published_online(line[18]);
kbartLineInDtoObject.setMonograph_volume(Integer.getInteger(line[19]));
kbartLineInDtoObject.setMonograph_edition(line[20]);
kbartLineInDtoObject.setFirst_editor(line[21]);
kbartLineInDtoObject.setParent_publication_title_id(line[22]);
kbartLineInDtoObject.setPreceding_publication_title_id(line[23]);
kbartLineInDtoObject.setAccess_type(line[24]);
// Vérification de la présence d'un best ppn déjà renseigné dans le kbart
if (line.length == 26) {
kbartLineInDtoObject.setBestPpn(line[25]);
}
return kbartLineInDtoObject;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class SenderConfig {
@EnableTransactionManagement
public class KafkaConfig {

@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
Expand All @@ -35,6 +37,9 @@ public Map<String, Object> producerConfigs() {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix);
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put(ProducerConfig.LINGER_MS_CONFIG, "20");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(512*1024));
return props;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/fr/abes/kbart2kafka/dto/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ public class Header {

private String fileName;

private int totalNumberOfLine;
private long totalNumberOfLine;

private int currentLine;

public Header(String fileName, int totalNumberOfLine) {
public Header(String fileName, long totalNumberOfLine) {
this.fileName = fileName;
this.totalNumberOfLine = totalNumberOfLine;
}
Expand Down
140 changes: 140 additions & 0 deletions src/main/java/fr/abes/kbart2kafka/service/FileService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package fr.abes.kbart2kafka.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import fr.abes.kbart2kafka.dto.Header;
import fr.abes.kbart2kafka.dto.LigneKbartDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Service
@Slf4j
public class FileService {

@Value("${topic.name.target.kbart}")
private String topicKbart;

private final KafkaTemplate<String, String> kafkaTemplate;

private final ObjectMapper mapper;
ExecutorService executor = Executors.newFixedThreadPool(5);

public FileService(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper mapper) {
this.kafkaTemplate = kafkaTemplate;
this.mapper = mapper;
}

@Transactional
public void loadFile(File fichier, String kbartHeader, long totalNumberOfLine) {
try {
executeMultiThread(fichier, kbartHeader, totalNumberOfLine);
} catch (IOException ex) {
log.error("Erreur dans la lecture du fichier");
}

}

private void executeMultiThread(File fichier, String kbartHeader, long totalNumberOfLine) throws IOException {
// Compteur du nombre de lignes dans le kbart
int lineCounter = 0;
// Création du header et ajout du nombre total de lignes
Header kafkaHeader = new Header(fichier.getName(), totalNumberOfLine);
BufferedReader buff = new BufferedReader(new FileReader(fichier));
for (String ligneKbart : buff.lines().toList()) {
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;
// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

// Envoi de la ligne kbart dans le producer
kafkaHeader.setCurrentLine(lineCounter);
executor.execute(() -> {
try {
List<org.apache.kafka.common.header.Header> headers = new ArrayList<>();
headers.add(new RecordHeader("FileName", kafkaHeader.getFileName().getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("CurrentLine", String.valueOf(kafkaHeader.getCurrentLine()).getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("TotalLine", String.valueOf(kafkaHeader.getTotalNumberOfLine()).getBytes(StandardCharsets.UTF_8)));
ProducerRecord<String, String> record = new ProducerRecord<>(topicKbart, new Random().nextInt(5), "", mapper.writeValueAsString(ligneKbartDto), headers);
CompletableFuture<SendResult<String, String>> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record));
log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value()));
} catch (JsonProcessingException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
executor.shutdown();
Message<String> message = MessageBuilder
.withPayload("OK")
.setHeader(KafkaHeaders.TOPIC, topicKbart)
.setHeader("FileName", kafkaHeader.getFileName())
.setHeader("CurrentLine", kafkaHeader.getCurrentLine())
.setHeader("TotalLine", kafkaHeader.getTotalNumberOfLine())
.build();

kafkaTemplate.send(message);
}

/**
* Construction de la dto
*
* @param line ligne en entrée
* @return Un objet DTO initialisé avec les informations de la ligne
*/
private LigneKbartDto constructDto(String[] line) {
LigneKbartDto kbartLineInDtoObject = new LigneKbartDto();
kbartLineInDtoObject.setPublication_title(line[0]);
kbartLineInDtoObject.setPrint_identifier(line[1]);
kbartLineInDtoObject.setOnline_identifier(line[2]);
kbartLineInDtoObject.setDate_first_issue_online(line[3]);
kbartLineInDtoObject.setNum_first_vol_online(Integer.getInteger(line[4]));
kbartLineInDtoObject.setNum_first_issue_online(Integer.getInteger(line[5]));
kbartLineInDtoObject.setDate_last_issue_online(line[6]);
kbartLineInDtoObject.setNum_last_vol_online(Integer.getInteger(line[7]));
kbartLineInDtoObject.setNum_last_issue_online(Integer.getInteger(line[8]));
kbartLineInDtoObject.setTitle_url(line[9]);
kbartLineInDtoObject.setFirst_author(line[10]);
kbartLineInDtoObject.setTitle_id(line[11]);
kbartLineInDtoObject.setEmbargo_info(line[12]);
kbartLineInDtoObject.setCoverage_depth(line[13]);
kbartLineInDtoObject.setNotes(line[14]);
kbartLineInDtoObject.setPublisher_name(line[15]);
kbartLineInDtoObject.setPublication_type(line[16]);
kbartLineInDtoObject.setDate_monograph_published_print(line[17]);
kbartLineInDtoObject.setDate_monograph_published_online(line[18]);
kbartLineInDtoObject.setMonograph_volume(Integer.getInteger(line[19]));
kbartLineInDtoObject.setMonograph_edition(line[20]);
kbartLineInDtoObject.setFirst_editor(line[21]);
kbartLineInDtoObject.setParent_publication_title_id(line[22]);
kbartLineInDtoObject.setPreceding_publication_title_id(line[23]);
kbartLineInDtoObject.setAccess_type(line[24]);
// Vérification de la présence d'un best ppn déjà renseigné dans le kbart
if (line.length == 26) {
kbartLineInDtoObject.setBestPpn(line[25]);
}
return kbartLineInDtoObject;
}
}
2 changes: 0 additions & 2 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ [email protected]@
application.version=@project.version@
application.basedir=@webBaseDir@

# Configuration du serveur Http
server.port=8087

# Configuration des logs
log4j2.logdir=logs
Expand Down

0 comments on commit de07647

Please sign in to comment.