Skip to content

Commit

Permalink
rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelQuetin committed Aug 28, 2023
1 parent 0a61f0c commit ec0a24b
Showing 1 changed file with 18 additions and 22 deletions.
40 changes: 18 additions & 22 deletions src/main/java/fr/abes/kbart2kafka/Kbart2kafkaApplication.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package fr.abes.kbart2kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import fr.abes.kbart2kafka.dto.Header;
import fr.abes.kbart2kafka.dto.LigneKbartDto;
import fr.abes.kbart2kafka.exception.IllegalFileFormatException;
Expand Down Expand Up @@ -41,6 +40,7 @@ public static void main(String[] args) {
* @throws IOException Exception levée lorsque aucun fichier tsv n'a été trouvé.
*/
@Override
@Transactional // on spécifie la class qui fait rollback, par defaut c'est toutes les classes qui ne sont pas gérées càd : tout sauf IOException
public void run(String... args) throws IOException {

// Contrôle de la présence d'un paramètre au lancement de Kbart2kafkaApplication
Expand Down Expand Up @@ -70,7 +70,23 @@ public void run(String... args) throws IOException {

// Création du header et ajout du nombre total de lignes
Header kafkaHeader = new Header(tsvFile.getName(), totalNumberOfLine);
sendAllKbartToKafka(kbart, lineCounter, kafkaHeader);

while (kbart.hasNextLine()) {
String ligneKbart = kbart.nextLine();
if (!ligneKbart.contains(kbartHeader)) {
lineCounter ++;

// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

// Envoi de la ligne kbart dans le producer
kafkaHeader.setCurrentLine(lineCounter);
topicProducer.sendLigneKbart(ligneKbartDto, kafkaHeader);
}
}
// Envoi du message de fin de traitement dans le producer "OK"
topicProducer.sendOk(kafkaHeader);
} catch (IOException e) {
throw new IOException(e);
} catch (IllegalFileFormatException | IllegalProviderException e) {
Expand All @@ -79,26 +95,6 @@ public void run(String... args) throws IOException {
}
}

@Transactional(rollbackFor = Exception.class) // on spécifie la class qui fait rollback, par defaut c'est toutes les classes qui ne sont pas gérées càd : tout sauf IOException
public void sendAllKbartToKafka(Scanner kbart, int lineCounter, Header kafkaHeader) throws JsonProcessingException {
while (kbart.hasNextLine()) {
String ligneKbart = kbart.nextLine();
if (!ligneKbart.contains(kbartHeader)) {
lineCounter++;

// Crée un nouvel objet dto, set les différentes parties et envoi au service topicProducer
String[] tsvElementsOnOneLine = ligneKbart.split("\t");
LigneKbartDto ligneKbartDto = constructDto(tsvElementsOnOneLine);

// Envoi de la ligne kbart dans le producer
kafkaHeader.setCurrentLine(lineCounter);
topicProducer.sendLigneKbart(ligneKbartDto, kafkaHeader);
}
}
// Envoi du message de fin de traitement dans le producer "OK"
topicProducer.sendOk(kafkaHeader);
}

/**
* Construction de la dto
*
Expand Down

0 comments on commit ec0a24b

Please sign in to comment.