From de076473d399abccf2fda92d51fa308d106235cc Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Wed, 22 Nov 2023 08:15:58 +0100 Subject: [PATCH] FEAT : Tests multi threading --- pom.xml | 5 + .../kbart2kafka/Kbart2kafkaApplication.java | 144 +++--------------- .../{SenderConfig.java => KafkaConfig.java} | 7 +- .../java/fr/abes/kbart2kafka/dto/Header.java | 4 +- .../abes/kbart2kafka/service/FileService.java | 140 +++++++++++++++++ src/main/resources/application.properties | 2 - 6 files changed, 170 insertions(+), 132 deletions(-) rename src/main/java/fr/abes/kbart2kafka/configuration/{SenderConfig.java => KafkaConfig.java} (86%) create mode 100644 src/main/java/fr/abes/kbart2kafka/service/FileService.java diff --git a/pom.xml b/pom.xml index 8d13a9a..52c58ed 100644 --- a/pom.xml +++ b/pom.xml @@ -105,6 +105,11 @@ + + org.aspectj + aspectjweaver + + org.modelmapper diff --git a/src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java b/src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java index 3e91b21..0baac17 100644 --- a/src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java +++ b/src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java @@ -1,12 +1,8 @@ package fr.abes.kbart2kafka; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import fr.abes.kbart2kafka.dto.Header; -import fr.abes.kbart2kafka.dto.LigneKbartDto; import fr.abes.kbart2kafka.exception.IllegalFileFormatException; import fr.abes.kbart2kafka.exception.IllegalProviderException; -import fr.abes.kbart2kafka.kafka.TopicProducer; +import fr.abes.kbart2kafka.service.FileService; import fr.abes.kbart2kafka.utils.CheckFiles; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -14,41 +10,22 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.transaction.KafkaTransactionManager; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.DefaultTransactionDefinition; +import org.springframework.transaction.annotation.EnableTransactionManagement; +import java.io.BufferedReader; import java.io.File; +import java.io.FileReader; import java.io.IOException; -import java.util.Scanner; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; @Slf4j -@EnableKafka @SpringBootApplication public class Kbart2kafkaApplication implements CommandLineRunner { - @Value("${kbart.header}") private String kbartHeader; + private final FileService service; - private final TopicProducer topicProducer; - - private final ObjectMapper mapper; - - private final KafkaTransactionManager kafkaTransactionManager; - - ExecutorService executor = Executors.newFixedThreadPool(5); - - public Kbart2kafkaApplication(TopicProducer topicProducer, ObjectMapper mapper, KafkaTransactionManager kafkaTransactionManager) { - this.topicProducer = topicProducer; - this.mapper = mapper; - this.kafkaTransactionManager = kafkaTransactionManager; + public Kbart2kafkaApplication(FileService service) { + this.service = service; } public static void main(String[] args) { @@ -62,10 +39,9 @@ public static void main(String[] args) { * @throws IOException Exception levée lorsque aucun fichier tsv n'a été trouvé. */ @Override - @Transactional // on spécifie la class qui fait rollback, par defaut c'est toutes les classes qui ne sont pas gérées càd : tout sauf IOException public void run(String... args) throws IOException { - + long startTime = System.currentTimeMillis(); // Contrôle de la présence d'un paramètre au lancement de Kbart2kafkaApplication if (args.length == 0 || args[0] == null || args[0].trim().isEmpty()) { log.error("Message envoyé : {}", "Le chemin d'accès au fichier tsv n'a pas été trouvé dans les paramètres de l'application"); @@ -73,107 +49,21 @@ public void run(String... args) throws IOException { log.info("Debut envois kafka de : " + args[0]); // Récupération du chemin d'accès au fichier File tsvFile = new File(args[0]); - DefaultTransactionDefinition def = new DefaultTransactionDefinition(); - def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - TransactionStatus status = kafkaTransactionManager.getTransaction(def); - log.info(String.valueOf(status.isNewTransaction())); + // Appelle du service de vérification de fichier try { - // Appelle du service de vérification de fichier CheckFiles.verifyFile(tsvFile, kbartHeader); - - // Calcul du nombre total de ligne - Scanner kbartTotalLines = new Scanner(tsvFile); - int totalNumberOfLine = 0; - while (kbartTotalLines.hasNextLine()) { - String ligneKbart = kbartTotalLines.nextLine(); - if (!ligneKbart.contains(kbartHeader)) { - totalNumberOfLine++; - } - } - - // Compteur du nombre de lignes dans le kbart - Scanner kbart = new Scanner(tsvFile); - int lineCounter = 0; - - // Création du header et ajout du nombre total de lignes - Header kafkaHeader = new Header(tsvFile.getName(), totalNumberOfLine); - - while (kbart.hasNextLine()) { - String ligneKbart = kbart.nextLine(); - if (!ligneKbart.contains(kbartHeader)) { - lineCounter++; - - // Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer - String[] tsvElementsOnOneLine = ligneKbart.split("\t"); - LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine); - - // Envoi de la ligne kbart dans le producer - kafkaHeader.setCurrentLine(lineCounter); - - executor.submit(() -> { - try { - topicProducer.sendLigneKbart(ligneKbartDto, kafkaHeader); - //log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value())); - } catch (JsonProcessingException e) { - kafkaTransactionManager.rollback(status); - throw new RuntimeException(e); - } - }); - } - } - // Envoi du message de fin de traitement dans le producer "OK" - kafkaTransactionManager.commit(status); - log.info(String.valueOf(status.isCompleted())); - } catch (IOException e) { - kafkaTransactionManager.rollback(status); - throw new IOException(e); } catch (IllegalFileFormatException | IllegalProviderException e) { - kafkaTransactionManager.rollback(status); throw new RuntimeException(e); } - finally { - executor.shutdown(); - } + // Calcul du nombre total de ligne + BufferedReader kbartTotalLines = new BufferedReader(new FileReader(tsvFile)); + service.loadFile(tsvFile, kbartHeader, kbartTotalLines.lines().count() - 1); } - } + long endTime = System.currentTimeMillis(); + double executionTime = (double) (endTime - startTime) / 1000; + log.info("Temps d'exécution : " + executionTime + " secondes"); - /** - * Construction de la dto - * - * @param line ligne en entrée - * @return Un objet DTO initialisé avec les informations de la ligne - */ - private LigneKbartDto constructDto(String[] line) { - LigneKbartDto kbartLineInDtoObject = new LigneKbartDto(); - kbartLineInDtoObject.setPublication_title(line[0]); - kbartLineInDtoObject.setPrint_identifier(line[1]); - kbartLineInDtoObject.setOnline_identifier(line[2]); - kbartLineInDtoObject.setDate_first_issue_online(line[3]); - kbartLineInDtoObject.setNum_first_vol_online(Integer.getInteger(line[4])); - kbartLineInDtoObject.setNum_first_issue_online(Integer.getInteger(line[5])); - kbartLineInDtoObject.setDate_last_issue_online(line[6]); - kbartLineInDtoObject.setNum_last_vol_online(Integer.getInteger(line[7])); - kbartLineInDtoObject.setNum_last_issue_online(Integer.getInteger(line[8])); - kbartLineInDtoObject.setTitle_url(line[9]); - kbartLineInDtoObject.setFirst_author(line[10]); - kbartLineInDtoObject.setTitle_id(line[11]); - kbartLineInDtoObject.setEmbargo_info(line[12]); - kbartLineInDtoObject.setCoverage_depth(line[13]); - kbartLineInDtoObject.setNotes(line[14]); - kbartLineInDtoObject.setPublisher_name(line[15]); - kbartLineInDtoObject.setPublication_type(line[16]); - kbartLineInDtoObject.setDate_monograph_published_print(line[17]); - kbartLineInDtoObject.setDate_monograph_published_online(line[18]); - kbartLineInDtoObject.setMonograph_volume(Integer.getInteger(line[19])); - kbartLineInDtoObject.setMonograph_edition(line[20]); - kbartLineInDtoObject.setFirst_editor(line[21]); - kbartLineInDtoObject.setParent_publication_title_id(line[22]); - kbartLineInDtoObject.setPreceding_publication_title_id(line[23]); - kbartLineInDtoObject.setAccess_type(line[24]); - // Vérification de la présence d'un best ppn déjà renseigné dans le kbart - if (line.length == 26) { - kbartLineInDtoObject.setBestPpn(line[25]); - } - return kbartLineInDtoObject; } + + } diff --git a/src/main/java/fr/abes/kbart2kafka/configuration/SenderConfig.java b/src/main/java/fr/abes/kbart2kafka/configuration/KafkaConfig.java similarity index 86% rename from src/main/java/fr/abes/kbart2kafka/configuration/SenderConfig.java rename to src/main/java/fr/abes/kbart2kafka/configuration/KafkaConfig.java index 553327e..a1a9cdc 100644 --- a/src/main/java/fr/abes/kbart2kafka/configuration/SenderConfig.java +++ b/src/main/java/fr/abes/kbart2kafka/configuration/KafkaConfig.java @@ -10,13 +10,15 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.transaction.annotation.EnableTransactionManagement; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka -public class SenderConfig { +@EnableTransactionManagement +public class KafkaConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; @@ -35,6 +37,9 @@ public Map producerConfigs() { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); + props.put(ProducerConfig.LINGER_MS_CONFIG, "20"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(512*1024)); return props; } diff --git a/src/main/java/fr/abes/kbart2kafka/dto/Header.java b/src/main/java/fr/abes/kbart2kafka/dto/Header.java index fbdbadc..d27ffa8 100644 --- a/src/main/java/fr/abes/kbart2kafka/dto/Header.java +++ b/src/main/java/fr/abes/kbart2kafka/dto/Header.java @@ -9,11 +9,11 @@ public class Header { private String fileName; - private int totalNumberOfLine; + private long totalNumberOfLine; private int currentLine; - public Header(String fileName, int totalNumberOfLine) { + public Header(String fileName, long totalNumberOfLine) { this.fileName = fileName; this.totalNumberOfLine = totalNumberOfLine; } diff --git a/src/main/java/fr/abes/kbart2kafka/service/FileService.java b/src/main/java/fr/abes/kbart2kafka/service/FileService.java new file mode 100644 index 0000000..2e77086 --- /dev/null +++ b/src/main/java/fr/abes/kbart2kafka/service/FileService.java @@ -0,0 +1,140 @@ +package fr.abes.kbart2kafka.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import fr.abes.kbart2kafka.dto.Header; +import fr.abes.kbart2kafka.dto.LigneKbartDto; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Service +@Slf4j +public class FileService { + + @Value("${topic.name.target.kbart}") + private String topicKbart; + + private final KafkaTemplate kafkaTemplate; + + private final ObjectMapper mapper; + ExecutorService executor = Executors.newFixedThreadPool(5); + + public FileService(KafkaTemplate kafkaTemplate, ObjectMapper mapper) { + this.kafkaTemplate = kafkaTemplate; + this.mapper = mapper; + } + + @Transactional + public void loadFile(File fichier, String kbartHeader, long totalNumberOfLine) { + try { + executeMultiThread(fichier, kbartHeader, totalNumberOfLine); + } catch (IOException ex) { + log.error("Erreur dans la lecture du fichier"); + } + + } + + private void executeMultiThread(File fichier, String kbartHeader, long totalNumberOfLine) throws IOException { + // Compteur du nombre de lignes dans le kbart + int lineCounter = 0; + // Création du header et ajout du nombre total de lignes + Header kafkaHeader = new Header(fichier.getName(), totalNumberOfLine); + BufferedReader buff = new BufferedReader(new FileReader(fichier)); + for (String ligneKbart : buff.lines().toList()) { + if (!ligneKbart.contains(kbartHeader)) { + lineCounter++; + // Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer + String[] tsvElementsOnOneLine = ligneKbart.split("\t"); + LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine); + + // Envoi de la ligne kbart dans le producer + kafkaHeader.setCurrentLine(lineCounter); + executor.execute(() -> { + try { + List headers = new ArrayList<>(); + headers.add(new RecordHeader("FileName", kafkaHeader.getFileName().getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("CurrentLine", String.valueOf(kafkaHeader.getCurrentLine()).getBytes(StandardCharsets.UTF_8))); + headers.add(new RecordHeader("TotalLine", String.valueOf(kafkaHeader.getTotalNumberOfLine()).getBytes(StandardCharsets.UTF_8))); + ProducerRecord record = new ProducerRecord<>(topicKbart, new Random().nextInt(5), "", mapper.writeValueAsString(ligneKbartDto), headers); + CompletableFuture> result = kafkaTemplate.executeInTransaction(kt -> kt.send(record)); + log.debug("Message envoyé : {}", mapper.writeValueAsString(result.get().getProducerRecord().value())); + } catch (JsonProcessingException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + } + executor.shutdown(); + Message message = MessageBuilder + .withPayload("OK") + .setHeader(KafkaHeaders.TOPIC, topicKbart) + .setHeader("FileName", kafkaHeader.getFileName()) + .setHeader("CurrentLine", kafkaHeader.getCurrentLine()) + .setHeader("TotalLine", kafkaHeader.getTotalNumberOfLine()) + .build(); + + kafkaTemplate.send(message); + } + + /** + * Construction de la dto + * + * @param line ligne en entrée + * @return Un objet DTO initialisé avec les informations de la ligne + */ + private LigneKbartDto constructDto(String[] line) { + LigneKbartDto kbartLineInDtoObject = new LigneKbartDto(); + kbartLineInDtoObject.setPublication_title(line[0]); + kbartLineInDtoObject.setPrint_identifier(line[1]); + kbartLineInDtoObject.setOnline_identifier(line[2]); + kbartLineInDtoObject.setDate_first_issue_online(line[3]); + kbartLineInDtoObject.setNum_first_vol_online(Integer.getInteger(line[4])); + kbartLineInDtoObject.setNum_first_issue_online(Integer.getInteger(line[5])); + kbartLineInDtoObject.setDate_last_issue_online(line[6]); + kbartLineInDtoObject.setNum_last_vol_online(Integer.getInteger(line[7])); + kbartLineInDtoObject.setNum_last_issue_online(Integer.getInteger(line[8])); + kbartLineInDtoObject.setTitle_url(line[9]); + kbartLineInDtoObject.setFirst_author(line[10]); + kbartLineInDtoObject.setTitle_id(line[11]); + kbartLineInDtoObject.setEmbargo_info(line[12]); + kbartLineInDtoObject.setCoverage_depth(line[13]); + kbartLineInDtoObject.setNotes(line[14]); + kbartLineInDtoObject.setPublisher_name(line[15]); + kbartLineInDtoObject.setPublication_type(line[16]); + kbartLineInDtoObject.setDate_monograph_published_print(line[17]); + kbartLineInDtoObject.setDate_monograph_published_online(line[18]); + kbartLineInDtoObject.setMonograph_volume(Integer.getInteger(line[19])); + kbartLineInDtoObject.setMonograph_edition(line[20]); + kbartLineInDtoObject.setFirst_editor(line[21]); + kbartLineInDtoObject.setParent_publication_title_id(line[22]); + kbartLineInDtoObject.setPreceding_publication_title_id(line[23]); + kbartLineInDtoObject.setAccess_type(line[24]); + // Vérification de la présence d'un best ppn déjà renseigné dans le kbart + if (line.length == 26) { + kbartLineInDtoObject.setBestPpn(line[25]); + } + return kbartLineInDtoObject; + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 91ed9e4..b085c46 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -8,8 +8,6 @@ application.name=@project.artifactId@ application.version=@project.version@ application.basedir=@webBaseDir@ -# Configuration du serveur Http -server.port=8087 # Configuration des logs log4j2.logdir=logs