From 4f8a1683170079fabe359448f0c213c21b80d5fa Mon Sep 17 00:00:00 2001 From: kasean Date: Thu, 21 Dec 2023 14:08:09 +0300 Subject: [PATCH 01/11] new error from container --- containers-errors/Errors.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 containers-errors/Errors.md diff --git a/containers-errors/Errors.md b/containers-errors/Errors.md new file mode 100644 index 0000000..54d105b --- /dev/null +++ b/containers-errors/Errors.md @@ -0,0 +1 @@ +org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600) \ No newline at end of file From ccb2a65906927e96e94e791c430980d7980d5607 Mon Sep 17 00:00:00 2001 From: kasean Date: Thu, 21 Dec 2023 14:11:32 +0300 Subject: [PATCH 02/11] added kafka to docker-compose.yml #7 --- docker/docker-compose.yml | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 017930c..655f761 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -2,9 +2,6 @@ version: '3.7' services: keycloak: -# build: -# context: . -# dockerfile: Dockerfile image: jboss/keycloak environment: - KEYCLOAK_USER=admin @@ -28,5 +25,21 @@ services: volumes: - postgres_data:/var/lib/postgresql/data + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + ports: + - "9092:9092" + volumes: postgres_data: From d304ab9722957bd9c3280e7cfd7b440ca8705337 Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 12:57:46 +0300 Subject: [PATCH 03/11] first version of messaging between core and archiver #9 --- archiver/build.gradle | 1 + .../student/messaging/MessageConsumer.java | 37 +++++++++++++++++++ .../student/messaging/MessageProducer.java | 26 +++++++++++++ core-back-end/build.gradle | 1 + .../org/student/messaging/KafkaConfig.java | 36 ++++++++++++++++++ .../org/student/messaging/KafkaConsumer.java | 14 +++++++ .../src/main/resources/application.yaml | 12 ++++++ docker/docker-compose.yml | 1 + 8 files changed, 128 insertions(+) create mode 100644 archiver/src/main/java/org/student/messaging/MessageConsumer.java create mode 100644 archiver/src/main/java/org/student/messaging/MessageProducer.java create mode 100644 core-back-end/src/main/java/org/student/messaging/KafkaConfig.java create mode 100644 core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java diff --git a/archiver/build.gradle b/archiver/build.gradle index aaac026..9f4c876 100644 --- a/archiver/build.gradle +++ b/archiver/build.gradle @@ -10,6 +10,7 @@ repositories { } dependencies { + implementation 'org.apache.kafka:kafka-clients:3.4.0' testImplementation platform('org.junit:junit-bom:5.9.1') testImplementation 'org.junit.jupiter:junit-jupiter' } diff --git a/archiver/src/main/java/org/student/messaging/MessageConsumer.java b/archiver/src/main/java/org/student/messaging/MessageConsumer.java new file mode 100644 index 0000000..12dbe23 --- /dev/null +++ b/archiver/src/main/java/org/student/messaging/MessageConsumer.java @@ -0,0 +1,37 @@ +package org.student.messaging; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +public class MessageConsumer { + + private final KafkaConsumer consumer; + + public MessageConsumer(String bootstrapServers, String groupId) { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + this.consumer = new KafkaConsumer<>(properties); + } + + public void consume(String topic) { + consumer.subscribe(Collections.singletonList(topic)); + + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + System.out.println("Consumed message: " + new String(record.value())); + } + } + } +} diff --git a/archiver/src/main/java/org/student/messaging/MessageProducer.java b/archiver/src/main/java/org/student/messaging/MessageProducer.java new file mode 100644 index 0000000..72888c6 --- /dev/null +++ b/archiver/src/main/java/org/student/messaging/MessageProducer.java @@ -0,0 +1,26 @@ +package org.student.messaging; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +public class MessageProducer { + + private final KafkaProducer producer; + + public MessageProducer(String bootstrapServers) { + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + this.producer = new KafkaProducer<>(properties); + } + + public void send(String topic, byte[] message) { + producer.send(new ProducerRecord<>(topic, message)); + } +} diff --git a/core-back-end/build.gradle b/core-back-end/build.gradle index c3a7835..f6cd087 100644 --- a/core-back-end/build.gradle +++ b/core-back-end/build.gradle @@ -23,6 +23,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-security' implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '18.0.2' implementation 'org.keycloak:keycloak-admin-client:23.0.3' + implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'io.projectreactor:reactor-test' } diff --git a/core-back-end/src/main/java/org/student/messaging/KafkaConfig.java b/core-back-end/src/main/java/org/student/messaging/KafkaConfig.java new file mode 100644 index 0000000..237dcd3 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/KafkaConfig.java @@ -0,0 +1,36 @@ +package org.student.messaging; + +import jakarta.validation.Valid; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ProducerFactory producerFactory() { + Map configProp = new HashMap<>(); + configProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + return new DefaultKafkaProducerFactory<>(configProp); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java b/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java new file mode 100644 index 0000000..1452f95 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java @@ -0,0 +1,14 @@ +package org.student.messaging; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +public class KafkaConsumer { + + @KafkaListener(topics = "archiver-topic", groupId = "core-file-garbage-group") + public void consume(byte[] message) { + System.out.println("Consumed message: " + new String(message)); + } + +} diff --git a/core-back-end/src/main/resources/application.yaml b/core-back-end/src/main/resources/application.yaml index 7bc0f13..079f1c7 100644 --- a/core-back-end/src/main/resources/application.yaml +++ b/core-back-end/src/main/resources/application.yaml @@ -12,3 +12,15 @@ keycloak: patterns: - /secured/* cors: true + +spring: + kafka: + bootstrap-servers: localhost:9092 + consumer: + group-id: core-file-garbage-group + auto-offset-reset: earlier + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer \ No newline at end of file diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 655f761..07f365e 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -38,6 +38,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CREATE_TOPICS: "archiver-topic:1:1,data-processor-topic:1:1,artifact-processor-topic:1:1" ports: - "9092:9092" From 4567f0e68ea2bbf8294d5cb5f60669f6126bbd7a Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 13:03:11 +0300 Subject: [PATCH 04/11] removed infinity loop #9 --- .../src/main/java/org/student/messaging/MessageConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/archiver/src/main/java/org/student/messaging/MessageConsumer.java b/archiver/src/main/java/org/student/messaging/MessageConsumer.java index 12dbe23..46f8cc3 100644 --- a/archiver/src/main/java/org/student/messaging/MessageConsumer.java +++ b/archiver/src/main/java/org/student/messaging/MessageConsumer.java @@ -27,8 +27,8 @@ public MessageConsumer(String bootstrapServers, String groupId) { public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + if (!records.isEmpty()) { for (ConsumerRecord record : records) { System.out.println("Consumed message: " + new String(record.value())); } From 3990095de17c118589ac59920387577084d73574 Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 13:34:50 +0300 Subject: [PATCH 05/11] fixed error in kafka #9 --- .../src/main/java/org/student/Application.java | 5 ++++- .../org/student/messaging/MessageConsumer.java | 14 +++++++++----- docker/docker-compose.yml | 1 + 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/archiver/src/main/java/org/student/Application.java b/archiver/src/main/java/org/student/Application.java index de20c6c..291fbc5 100644 --- a/archiver/src/main/java/org/student/Application.java +++ b/archiver/src/main/java/org/student/Application.java @@ -1,7 +1,10 @@ package org.student; +import org.student.messaging.MessageConsumer; + public class Application { public static void main(String[] args) { - System.out.println("Hello world!"); + MessageConsumer consumer = new MessageConsumer("localhost:9092", "core-file-garbage-group"); + consumer.consume("archiver-topic"); } } \ No newline at end of file diff --git a/archiver/src/main/java/org/student/messaging/MessageConsumer.java b/archiver/src/main/java/org/student/messaging/MessageConsumer.java index 46f8cc3..4ca54f6 100644 --- a/archiver/src/main/java/org/student/messaging/MessageConsumer.java +++ b/archiver/src/main/java/org/student/messaging/MessageConsumer.java @@ -27,11 +27,15 @@ public MessageConsumer(String bootstrapServers, String groupId) { public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); - if (!records.isEmpty()) { - for (ConsumerRecord record : records) { - System.out.println("Consumed message: " + new String(record.value())); - } + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (!records.isEmpty()) { + for (ConsumerRecord record : records) { + System.out.println("Consumed message: " + new String(record.value())); + } + } else + System.out.println("No messages"); } + } } diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 07f365e..2a6900f 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -34,6 +34,7 @@ services: image: confluentinc/cp-kafka:latest depends_on: - zookeeper + restart: on-failure environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 From 81c78ec25be5447a09639a1a0b3eeff24ce6e9cb Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 14:16:36 +0300 Subject: [PATCH 06/11] core back-end messaging #10 --- .../student/messaging/MessageConsumer.java | 5 ++++- .../student/messaging/MessageProducer.java | 2 +- .../org/student/messaging/KafkaConsumer.java | 14 ------------- .../org/student/messaging/KafkaTopics.java | 7 +++++++ .../messaging/archiver/ArchiverListener.java | 16 ++++++++++++++ .../messaging/archiver/ArchiverProducer.java | 21 +++++++++++++++++++ .../ArtifactProcessorListener.java | 15 +++++++++++++ .../ArtifactProcessorProducer.java | 17 +++++++++++++++ .../data_processor/DataProcessorListener.java | 15 +++++++++++++ .../data_processor/DataProcessorProducer.java | 19 +++++++++++++++++ docker/docker-compose.yml | 2 +- 11 files changed, 116 insertions(+), 17 deletions(-) delete mode 100644 core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java create mode 100644 core-back-end/src/main/java/org/student/messaging/KafkaTopics.java create mode 100644 core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java create mode 100644 core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java create mode 100644 core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java create mode 100644 core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java create mode 100644 core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java create mode 100644 core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java diff --git a/archiver/src/main/java/org/student/messaging/MessageConsumer.java b/archiver/src/main/java/org/student/messaging/MessageConsumer.java index 4ca54f6..aac83c3 100644 --- a/archiver/src/main/java/org/student/messaging/MessageConsumer.java +++ b/archiver/src/main/java/org/student/messaging/MessageConsumer.java @@ -4,10 +4,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.Properties; @@ -25,7 +27,8 @@ public MessageConsumer(String bootstrapServers, String groupId) { } public void consume(String topic) { - consumer.subscribe(Collections.singletonList(topic)); + TopicPartition partition = new TopicPartition(topic, 0); + consumer.assign(Collections.singletonList(partition)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); diff --git a/archiver/src/main/java/org/student/messaging/MessageProducer.java b/archiver/src/main/java/org/student/messaging/MessageProducer.java index 72888c6..713183d 100644 --- a/archiver/src/main/java/org/student/messaging/MessageProducer.java +++ b/archiver/src/main/java/org/student/messaging/MessageProducer.java @@ -21,6 +21,6 @@ public MessageProducer(String bootstrapServers) { } public void send(String topic, byte[] message) { - producer.send(new ProducerRecord<>(topic, message)); + producer.send(new ProducerRecord<>(topic, 1, null, message)); } } diff --git a/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java b/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java deleted file mode 100644 index 1452f95..0000000 --- a/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.student.messaging; - -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class KafkaConsumer { - - @KafkaListener(topics = "archiver-topic", groupId = "core-file-garbage-group") - public void consume(byte[] message) { - System.out.println("Consumed message: " + new String(message)); - } - -} diff --git a/core-back-end/src/main/java/org/student/messaging/KafkaTopics.java b/core-back-end/src/main/java/org/student/messaging/KafkaTopics.java new file mode 100644 index 0000000..d1f8bc2 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/KafkaTopics.java @@ -0,0 +1,7 @@ +package org.student.messaging; + +public class KafkaTopics { + public static final String ARCHIVER_TOPIC = "archiver-topic"; + public static final String DATA_PROCESSOR_TOPIC ="data-processor-topic"; + public static final String ARTIFACT_PROCESSOR_TOPIC = "artifact-processor-topic"; +} diff --git a/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java new file mode 100644 index 0000000..2e62776 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java @@ -0,0 +1,16 @@ +package org.student.messaging.archiver; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class ArchiverListener { + + @KafkaListener(id = "archiver-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.ARCHIVER_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group") + public void consume(byte[] message) { + System.out.println("Consumed message: " + new String(message)); + } + +} diff --git a/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java new file mode 100644 index 0000000..835c519 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java @@ -0,0 +1,21 @@ +package org.student.messaging.archiver; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class ArchiverProducer { + + private final KafkaTemplate template; + + ArchiverProducer(KafkaTemplate template) { + this.template = template; + } + + public void send(byte[] message) { + template.send(new ProducerRecord<>(KafkaTopics.ARCHIVER_TOPIC, 0, null, message)); + } + +} diff --git a/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java new file mode 100644 index 0000000..dcf968d --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java @@ -0,0 +1,15 @@ +package org.student.messaging.artifact_processor; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class ArtifactProcessorListener { + @KafkaListener(id = "artifact-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.ARTIFACT_PROCESSOR_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group") + public void consume(byte[] message) { + System.out.println("Consumed message: " + new String(message)); + } + +} diff --git a/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java new file mode 100644 index 0000000..6c40a28 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java @@ -0,0 +1,17 @@ +package org.student.messaging.artifact_processor; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.KafkaTemplate; +import org.student.messaging.KafkaTopics; + +public class ArtifactProcessorProducer { + private final KafkaTemplate template; + + ArtifactProcessorProducer(KafkaTemplate template) { + this.template = template; + } + + public void send(byte[] message) { + template.send(new ProducerRecord<>(KafkaTopics.ARTIFACT_PROCESSOR_TOPIC, 0, null, message)); + } +} diff --git a/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java new file mode 100644 index 0000000..0aed527 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java @@ -0,0 +1,15 @@ +package org.student.messaging.data_processor; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class DataProcessorListener { + @KafkaListener(id = "data-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.DATA_PROCESSOR_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group") + public void consume(byte[] message) { + System.out.println("Consumed message: " + new String(message)); + } + +} diff --git a/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java new file mode 100644 index 0000000..f69ec61 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java @@ -0,0 +1,19 @@ +package org.student.messaging.data_processor; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class DataProcessorProducer { + private final KafkaTemplate template; + + DataProcessorProducer(KafkaTemplate template) { + this.template = template; + } + + public void send(byte[] message) { + template.send(new ProducerRecord<>(KafkaTopics.DATA_PROCESSOR_TOPIC, 0, null, message)); + } +} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2a6900f..840e69e 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -39,7 +39,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CREATE_TOPICS: "archiver-topic:1:1,data-processor-topic:1:1,artifact-processor-topic:1:1" + KAFKA_CREATE_TOPICS: "archiver-topic:2:1,data-processor-topic:2:1,artifact-processor-topic:2:1" ports: - "9092:9092" From 719c3af26077ac8285aeb887135b865b7c43b052 Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 14:31:50 +0300 Subject: [PATCH 07/11] artifact processor messaging #12 --- artifact-processor/build.gradle | 1 + .../student/{Main.java => Application.java} | 2 +- .../student/messaging/MessageConsumer.java | 43 +++++++++++++++++++ .../student/messaging/MessageProducer.java | 26 +++++++++++ .../student/{Main.java => Application.java} | 2 +- 5 files changed, 72 insertions(+), 2 deletions(-) rename artifact-processor/src/main/java/org/student/{Main.java => Application.java} (79%) create mode 100644 artifact-processor/src/main/java/org/student/messaging/MessageConsumer.java create mode 100644 artifact-processor/src/main/java/org/student/messaging/MessageProducer.java rename data-processor/src/main/java/org/student/{Main.java => Application.java} (79%) diff --git a/artifact-processor/build.gradle b/artifact-processor/build.gradle index aaac026..9f4c876 100644 --- a/artifact-processor/build.gradle +++ b/artifact-processor/build.gradle @@ -10,6 +10,7 @@ repositories { } dependencies { + implementation 'org.apache.kafka:kafka-clients:3.4.0' testImplementation platform('org.junit:junit-bom:5.9.1') testImplementation 'org.junit.jupiter:junit-jupiter' } diff --git a/artifact-processor/src/main/java/org/student/Main.java b/artifact-processor/src/main/java/org/student/Application.java similarity index 79% rename from artifact-processor/src/main/java/org/student/Main.java rename to artifact-processor/src/main/java/org/student/Application.java index 5fd4e5a..de20c6c 100644 --- a/artifact-processor/src/main/java/org/student/Main.java +++ b/artifact-processor/src/main/java/org/student/Application.java @@ -1,6 +1,6 @@ package org.student; -public class Main { +public class Application { public static void main(String[] args) { System.out.println("Hello world!"); } diff --git a/artifact-processor/src/main/java/org/student/messaging/MessageConsumer.java b/artifact-processor/src/main/java/org/student/messaging/MessageConsumer.java new file mode 100644 index 0000000..d235831 --- /dev/null +++ b/artifact-processor/src/main/java/org/student/messaging/MessageConsumer.java @@ -0,0 +1,43 @@ +package org.student.messaging; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +public class MessageConsumer { + + private final KafkaConsumer consumer; + + public MessageConsumer(String bootstrapServers, String groupId) { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + this.consumer = new KafkaConsumer<>(properties); + } + + public void consume(String topic) { + TopicPartition partition = new TopicPartition(topic, 0); + consumer.assign(Collections.singletonList(partition)); + + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (!records.isEmpty()) { + for (ConsumerRecord record : records) { + System.out.println("Consumed message: " + new String(record.value())); + } + } else + System.out.println("No messages"); + } + + } +} diff --git a/artifact-processor/src/main/java/org/student/messaging/MessageProducer.java b/artifact-processor/src/main/java/org/student/messaging/MessageProducer.java new file mode 100644 index 0000000..713183d --- /dev/null +++ b/artifact-processor/src/main/java/org/student/messaging/MessageProducer.java @@ -0,0 +1,26 @@ +package org.student.messaging; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +public class MessageProducer { + + private final KafkaProducer producer; + + public MessageProducer(String bootstrapServers) { + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + this.producer = new KafkaProducer<>(properties); + } + + public void send(String topic, byte[] message) { + producer.send(new ProducerRecord<>(topic, 1, null, message)); + } +} diff --git a/data-processor/src/main/java/org/student/Main.java b/data-processor/src/main/java/org/student/Application.java similarity index 79% rename from data-processor/src/main/java/org/student/Main.java rename to data-processor/src/main/java/org/student/Application.java index 5fd4e5a..de20c6c 100644 --- a/data-processor/src/main/java/org/student/Main.java +++ b/data-processor/src/main/java/org/student/Application.java @@ -1,6 +1,6 @@ package org.student; -public class Main { +public class Application { public static void main(String[] args) { System.out.println("Hello world!"); } From 97a1356853148ef38df8050335706c2ae19ebf7a Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 14:50:28 +0300 Subject: [PATCH 08/11] data processor messaging #11 --- core-back-end/build.gradle | 2 +- data-processor/build.gradle | 24 +++++++++---- .../main/java/org/student/Application.java | 6 +++- .../org/student/messaging/KafkaConfig.java | 35 +++++++++++++++++++ .../student/messaging/MessageConsumer.java | 13 +++++++ .../student/messaging/MessageProducer.java | 18 ++++++++++ .../src/main/resources/application.yaml | 11 ++++++ 7 files changed, 100 insertions(+), 9 deletions(-) create mode 100644 data-processor/src/main/java/org/student/messaging/KafkaConfig.java create mode 100644 data-processor/src/main/java/org/student/messaging/MessageConsumer.java create mode 100644 data-processor/src/main/java/org/student/messaging/MessageProducer.java create mode 100644 data-processor/src/main/resources/application.yaml diff --git a/core-back-end/build.gradle b/core-back-end/build.gradle index f6cd087..ecd6e0c 100644 --- a/core-back-end/build.gradle +++ b/core-back-end/build.gradle @@ -21,7 +21,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-security' - implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '18.0.2' + implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '23.0.3' implementation 'org.keycloak:keycloak-admin-client:23.0.3' implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' diff --git a/data-processor/build.gradle b/data-processor/build.gradle index aaac026..a357639 100644 --- a/data-processor/build.gradle +++ b/data-processor/build.gradle @@ -1,19 +1,29 @@ plugins { id 'java' + id 'org.springframework.boot' version '3.2.0' + id 'io.spring.dependency-management' version '1.1.4' } -group = 'org.student' -version = '1.0-SNAPSHOT' +group = "org.student" +version = "1.0-SNAPSHOT" -repositories { - mavenCentral() +java { + sourceCompatibility = '17' +} + +configurations { + compileOnly { + extendsFrom annotationProcessor + } } dependencies { - testImplementation platform('org.junit:junit-bom:5.9.1') - testImplementation 'org.junit.jupiter:junit-jupiter' + implementation 'org.springframework.boot:spring-boot-starter:3.2.0' + implementation 'org.springframework.kafka:spring-kafka' + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation 'io.projectreactor:reactor-test' } -test { +tasks.named('test') { useJUnitPlatform() } \ No newline at end of file diff --git a/data-processor/src/main/java/org/student/Application.java b/data-processor/src/main/java/org/student/Application.java index de20c6c..3070c43 100644 --- a/data-processor/src/main/java/org/student/Application.java +++ b/data-processor/src/main/java/org/student/Application.java @@ -1,7 +1,11 @@ package org.student; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication public class Application { public static void main(String[] args) { - System.out.println("Hello world!"); + SpringApplication.run(Application.class, args); } } \ No newline at end of file diff --git a/data-processor/src/main/java/org/student/messaging/KafkaConfig.java b/data-processor/src/main/java/org/student/messaging/KafkaConfig.java new file mode 100644 index 0000000..d9109ea --- /dev/null +++ b/data-processor/src/main/java/org/student/messaging/KafkaConfig.java @@ -0,0 +1,35 @@ +package org.student.messaging; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ProducerFactory producerFactory() { + Map configProp = new HashMap<>(); + configProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + return new DefaultKafkaProducerFactory<>(configProp); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/data-processor/src/main/java/org/student/messaging/MessageConsumer.java b/data-processor/src/main/java/org/student/messaging/MessageConsumer.java new file mode 100644 index 0000000..d436b99 --- /dev/null +++ b/data-processor/src/main/java/org/student/messaging/MessageConsumer.java @@ -0,0 +1,13 @@ +package org.student.messaging; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; +import org.springframework.stereotype.Service; + +@Service +public class MessageConsumer { + @KafkaListener(id = "data-processor-listener", topicPartitions = @TopicPartition(topic = "artifact-processor-topic", partitions = {"0"}), groupId = "core-file-garbage-group") + public void consume(byte[] message) { + System.out.println("Consumed message: " + new String(message)); + } +} diff --git a/data-processor/src/main/java/org/student/messaging/MessageProducer.java b/data-processor/src/main/java/org/student/messaging/MessageProducer.java new file mode 100644 index 0000000..3bc7d80 --- /dev/null +++ b/data-processor/src/main/java/org/student/messaging/MessageProducer.java @@ -0,0 +1,18 @@ +package org.student.messaging; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class MessageProducer { + private final KafkaTemplate template; + + MessageProducer(KafkaTemplate template) { + this.template = template; + } + + public void send(byte[] message) { + template.send(new ProducerRecord<>("artifact-processor-topic", 1, null, message)); + } +} diff --git a/data-processor/src/main/resources/application.yaml b/data-processor/src/main/resources/application.yaml new file mode 100644 index 0000000..902cc5c --- /dev/null +++ b/data-processor/src/main/resources/application.yaml @@ -0,0 +1,11 @@ +spring: + kafka: + bootstrap-servers: localhost:9092 + consumer: + group-id: core-file-garbage-group + auto-offset-reset: earlier + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer \ No newline at end of file From a0aecda6d9692aeea2eb495e3b74c95841d1287f Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 14:53:49 +0300 Subject: [PATCH 09/11] fix build #11 --- core-back-end/build.gradle | 5 +++-- .../src/main/java/org/student/messaging/KafkaConfig.java | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core-back-end/build.gradle b/core-back-end/build.gradle index ecd6e0c..c672396 100644 --- a/core-back-end/build.gradle +++ b/core-back-end/build.gradle @@ -21,8 +21,8 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-security' - implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '23.0.3' - implementation 'org.keycloak:keycloak-admin-client:23.0.3' +// implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '23.0.3' +// implementation 'org.keycloak:keycloak-admin-client:23.0.3' implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'io.projectreactor:reactor-test' @@ -31,3 +31,4 @@ dependencies { tasks.named('test') { useJUnitPlatform() } + diff --git a/core-back-end/src/main/java/org/student/messaging/KafkaConfig.java b/core-back-end/src/main/java/org/student/messaging/KafkaConfig.java index 237dcd3..d9109ea 100644 --- a/core-back-end/src/main/java/org/student/messaging/KafkaConfig.java +++ b/core-back-end/src/main/java/org/student/messaging/KafkaConfig.java @@ -1,6 +1,5 @@ package org.student.messaging; -import jakarta.validation.Valid; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; From 46e1ae6b3993a3b5fae5da52b569675f48556ace Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 15:05:18 +0300 Subject: [PATCH 10/11] api module implemented #8 --- core-back-end/build.gradle | 3 +++ .../data_processor/DataProcessorListener.java | 2 +- .../data_processor/DataProcessorProducer.java | 2 +- data-processor/build.gradle | 2 ++ .../student/messaging/MessageConsumer.java | 3 ++- .../student/messaging/MessageProducer.java | 3 ++- messaging-api/build.gradle | 19 +++++++++++++++++++ .../messaging/topics}/KafkaTopics.java | 3 ++- settings.gradle | 1 + 9 files changed, 33 insertions(+), 5 deletions(-) create mode 100644 messaging-api/build.gradle rename {core-back-end/src/main/java/org/student/messaging => messaging-api/src/main/java/org/student/messaging/topics}/KafkaTopics.java (86%) diff --git a/core-back-end/build.gradle b/core-back-end/build.gradle index c672396..c80a545 100644 --- a/core-back-end/build.gradle +++ b/core-back-end/build.gradle @@ -18,6 +18,9 @@ configurations { } dependencies { + + implementation project(':messaging-api') + implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-security' diff --git a/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java index 0aed527..1021682 100644 --- a/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java +++ b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java @@ -3,7 +3,7 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.stereotype.Service; -import org.student.messaging.KafkaTopics; +import org.student.messaging.topics.KafkaTopics; @Service public class DataProcessorListener { diff --git a/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java index f69ec61..df0ba77 100644 --- a/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java +++ b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java @@ -3,7 +3,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; -import org.student.messaging.KafkaTopics; +import org.student.messaging.topics.KafkaTopics; @Service public class DataProcessorProducer { diff --git a/data-processor/build.gradle b/data-processor/build.gradle index a357639..d02f5ae 100644 --- a/data-processor/build.gradle +++ b/data-processor/build.gradle @@ -18,6 +18,8 @@ configurations { } dependencies { + implementation project(':messaging-api') + implementation 'org.springframework.boot:spring-boot-starter:3.2.0' implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' diff --git a/data-processor/src/main/java/org/student/messaging/MessageConsumer.java b/data-processor/src/main/java/org/student/messaging/MessageConsumer.java index d436b99..ac3c53a 100644 --- a/data-processor/src/main/java/org/student/messaging/MessageConsumer.java +++ b/data-processor/src/main/java/org/student/messaging/MessageConsumer.java @@ -3,10 +3,11 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.stereotype.Service; +import org.student.messaging.topics.KafkaTopics; @Service public class MessageConsumer { - @KafkaListener(id = "data-processor-listener", topicPartitions = @TopicPartition(topic = "artifact-processor-topic", partitions = {"0"}), groupId = "core-file-garbage-group") + @KafkaListener(id = "data-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.DATA_PROCESSOR_TOPIC, partitions = {"0"}), groupId = "core-file-garbage-group") public void consume(byte[] message) { System.out.println("Consumed message: " + new String(message)); } diff --git a/data-processor/src/main/java/org/student/messaging/MessageProducer.java b/data-processor/src/main/java/org/student/messaging/MessageProducer.java index 3bc7d80..ae1819a 100644 --- a/data-processor/src/main/java/org/student/messaging/MessageProducer.java +++ b/data-processor/src/main/java/org/student/messaging/MessageProducer.java @@ -3,6 +3,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; +import org.student.messaging.topics.KafkaTopics; @Service public class MessageProducer { @@ -13,6 +14,6 @@ public class MessageProducer { } public void send(byte[] message) { - template.send(new ProducerRecord<>("artifact-processor-topic", 1, null, message)); + template.send(new ProducerRecord<>(KafkaTopics.DATA_PROCESSOR_TOPIC, 1, null, message)); } } diff --git a/messaging-api/build.gradle b/messaging-api/build.gradle new file mode 100644 index 0000000..f38c271 --- /dev/null +++ b/messaging-api/build.gradle @@ -0,0 +1,19 @@ +plugins { + id("java") +} + +group = "org.student" +version = "1.0-SNAPSHOT" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation(platform("org.junit:junit-bom:5.9.1")) + testImplementation("org.junit.jupiter:junit-jupiter") +} + +tasks.test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/core-back-end/src/main/java/org/student/messaging/KafkaTopics.java b/messaging-api/src/main/java/org/student/messaging/topics/KafkaTopics.java similarity index 86% rename from core-back-end/src/main/java/org/student/messaging/KafkaTopics.java rename to messaging-api/src/main/java/org/student/messaging/topics/KafkaTopics.java index d1f8bc2..244d53d 100644 --- a/core-back-end/src/main/java/org/student/messaging/KafkaTopics.java +++ b/messaging-api/src/main/java/org/student/messaging/topics/KafkaTopics.java @@ -1,7 +1,8 @@ -package org.student.messaging; +package org.student.messaging.topics; public class KafkaTopics { public static final String ARCHIVER_TOPIC = "archiver-topic"; public static final String DATA_PROCESSOR_TOPIC ="data-processor-topic"; public static final String ARTIFACT_PROCESSOR_TOPIC = "artifact-processor-topic"; } + diff --git a/settings.gradle b/settings.gradle index c161e9b..4405bff 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,4 +3,5 @@ include 'core-back-end' include 'archiver' include 'artifact-processor' include 'data-processor' +include 'messaging-api' From db3648aadc48c6f4bae3537e9bdea0dc88eef9de Mon Sep 17 00:00:00 2001 From: kasean Date: Fri, 29 Dec 2023 15:09:22 +0300 Subject: [PATCH 11/11] fix imports #8 --- .../java/org/student/messaging/archiver/ArchiverListener.java | 2 +- .../java/org/student/messaging/archiver/ArchiverProducer.java | 2 +- .../messaging/artifact_processor/ArtifactProcessorListener.java | 2 +- .../messaging/artifact_processor/ArtifactProcessorProducer.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java index 2e62776..f646c66 100644 --- a/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java +++ b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java @@ -3,7 +3,7 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.stereotype.Service; -import org.student.messaging.KafkaTopics; +import org.student.messaging.topics.KafkaTopics; @Service public class ArchiverListener { diff --git a/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java index 835c519..ae1be63 100644 --- a/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java +++ b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java @@ -3,7 +3,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; -import org.student.messaging.KafkaTopics; +import org.student.messaging.topics.KafkaTopics; @Service public class ArchiverProducer { diff --git a/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java index dcf968d..5d326dc 100644 --- a/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java +++ b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java @@ -3,7 +3,7 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.stereotype.Service; -import org.student.messaging.KafkaTopics; +import org.student.messaging.topics.KafkaTopics; @Service public class ArtifactProcessorListener { diff --git a/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java index 6c40a28..06a392a 100644 --- a/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java +++ b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java @@ -2,7 +2,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; -import org.student.messaging.KafkaTopics; +import org.student.messaging.topics.KafkaTopics; public class ArtifactProcessorProducer { private final KafkaTemplate template;