diff --git a/archiver/src/main/java/org/student/messaging/MessageConsumer.java b/archiver/src/main/java/org/student/messaging/MessageConsumer.java index 4ca54f6..aac83c3 100644 --- a/archiver/src/main/java/org/student/messaging/MessageConsumer.java +++ b/archiver/src/main/java/org/student/messaging/MessageConsumer.java @@ -4,10 +4,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.Properties; @@ -25,7 +27,8 @@ public MessageConsumer(String bootstrapServers, String groupId) { } public void consume(String topic) { - consumer.subscribe(Collections.singletonList(topic)); + TopicPartition partition = new TopicPartition(topic, 0); + consumer.assign(Collections.singletonList(partition)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); diff --git a/archiver/src/main/java/org/student/messaging/MessageProducer.java b/archiver/src/main/java/org/student/messaging/MessageProducer.java index 72888c6..713183d 100644 --- a/archiver/src/main/java/org/student/messaging/MessageProducer.java +++ b/archiver/src/main/java/org/student/messaging/MessageProducer.java @@ -21,6 +21,6 @@ public MessageProducer(String bootstrapServers) { } public void send(String topic, byte[] message) { - producer.send(new ProducerRecord<>(topic, message)); + producer.send(new ProducerRecord<>(topic, 1, null, message)); } } diff --git a/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java b/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java deleted file mode 100644 index 1452f95..0000000 --- a/core-back-end/src/main/java/org/student/messaging/KafkaConsumer.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.student.messaging; - -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class KafkaConsumer { - - @KafkaListener(topics = "archiver-topic", groupId = "core-file-garbage-group") - public void consume(byte[] message) { - System.out.println("Consumed message: " + new String(message)); - } - -} diff --git a/core-back-end/src/main/java/org/student/messaging/KafkaTopics.java b/core-back-end/src/main/java/org/student/messaging/KafkaTopics.java new file mode 100644 index 0000000..d1f8bc2 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/KafkaTopics.java @@ -0,0 +1,7 @@ +package org.student.messaging; + +public class KafkaTopics { + public static final String ARCHIVER_TOPIC = "archiver-topic"; + public static final String DATA_PROCESSOR_TOPIC ="data-processor-topic"; + public static final String ARTIFACT_PROCESSOR_TOPIC = "artifact-processor-topic"; +} diff --git a/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java new file mode 100644 index 0000000..2e62776 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverListener.java @@ -0,0 +1,16 @@ +package org.student.messaging.archiver; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class ArchiverListener { + + @KafkaListener(id = "archiver-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.ARCHIVER_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group") + public void consume(byte[] message) { + System.out.println("Consumed message: " + new String(message)); + } + +} diff --git a/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java new file mode 100644 index 0000000..835c519 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/archiver/ArchiverProducer.java @@ -0,0 +1,21 @@ +package org.student.messaging.archiver; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class ArchiverProducer { + + private final KafkaTemplate template; + + ArchiverProducer(KafkaTemplate template) { + this.template = template; + } + + public void send(byte[] message) { + template.send(new ProducerRecord<>(KafkaTopics.ARCHIVER_TOPIC, 0, null, message)); + } + +} diff --git a/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java new file mode 100644 index 0000000..dcf968d --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorListener.java @@ -0,0 +1,15 @@ +package org.student.messaging.artifact_processor; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class ArtifactProcessorListener { + @KafkaListener(id = "artifact-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.ARTIFACT_PROCESSOR_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group") + public void consume(byte[] message) { + System.out.println("Consumed message: " + new String(message)); + } + +} diff --git a/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java new file mode 100644 index 0000000..6c40a28 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/artifact_processor/ArtifactProcessorProducer.java @@ -0,0 +1,17 @@ +package org.student.messaging.artifact_processor; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.KafkaTemplate; +import org.student.messaging.KafkaTopics; + +public class ArtifactProcessorProducer { + private final KafkaTemplate template; + + ArtifactProcessorProducer(KafkaTemplate template) { + this.template = template; + } + + public void send(byte[] message) { + template.send(new ProducerRecord<>(KafkaTopics.ARTIFACT_PROCESSOR_TOPIC, 0, null, message)); + } +} diff --git a/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java new file mode 100644 index 0000000..0aed527 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorListener.java @@ -0,0 +1,15 @@ +package org.student.messaging.data_processor; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.TopicPartition; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class DataProcessorListener { + @KafkaListener(id = "data-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.DATA_PROCESSOR_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group") + public void consume(byte[] message) { + System.out.println("Consumed message: " + new String(message)); + } + +} diff --git a/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java new file mode 100644 index 0000000..f69ec61 --- /dev/null +++ b/core-back-end/src/main/java/org/student/messaging/data_processor/DataProcessorProducer.java @@ -0,0 +1,19 @@ +package org.student.messaging.data_processor; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; +import org.student.messaging.KafkaTopics; + +@Service +public class DataProcessorProducer { + private final KafkaTemplate template; + + DataProcessorProducer(KafkaTemplate template) { + this.template = template; + } + + public void send(byte[] message) { + template.send(new ProducerRecord<>(KafkaTopics.DATA_PROCESSOR_TOPIC, 0, null, message)); + } +} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2a6900f..840e69e 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -39,7 +39,7 @@ services: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CREATE_TOPICS: "archiver-topic:1:1,data-processor-topic:1:1,artifact-processor-topic:1:1" + KAFKA_CREATE_TOPICS: "archiver-topic:2:1,data-processor-topic:2:1,artifact-processor-topic:2:1" ports: - "9092:9092"