Skip to content

Commit

Permalink
Merge pull request #23 from Kasean/milestones/messaging
Browse files Browse the repository at this point in the history
Milestones/messaging - Done
  • Loading branch information
Kasean authored Dec 29, 2023
2 parents 369bb7b + 0c608ea commit 4ecc3d9
Show file tree
Hide file tree
Showing 29 changed files with 459 additions and 21 deletions.
1 change: 1 addition & 0 deletions archiver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repositories {
}

dependencies {
implementation 'org.apache.kafka:kafka-clients:3.4.0'
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Expand Down
5 changes: 4 additions & 1 deletion archiver/src/main/java/org/student/Application.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.student;

import org.student.messaging.MessageConsumer;

public class Application {
public static void main(String[] args) {
System.out.println("Hello world!");
MessageConsumer consumer = new MessageConsumer("localhost:9092", "core-file-garbage-group");
consumer.consume("archiver-topic");
}
}
44 changes: 44 additions & 0 deletions archiver/src/main/java/org/student/messaging/MessageConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.student.messaging;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

private final KafkaConsumer<String, byte[]> consumer;

public MessageConsumer(String bootstrapServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);
}

public void consume(String topic) {
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));

while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
}
} else
System.out.println("No messages");
}

}
}
26 changes: 26 additions & 0 deletions archiver/src/main/java/org/student/messaging/MessageProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.student.messaging;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MessageProducer {

private final KafkaProducer<String, byte[]> producer;

public MessageProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
this.producer = new KafkaProducer<>(properties);
}

public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, 1, null, message));
}
}
1 change: 1 addition & 0 deletions artifact-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repositories {
}

dependencies {
implementation 'org.apache.kafka:kafka-clients:3.4.0'
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.student;

public class Main {
public class Application {
public static void main(String[] args) {
System.out.println("Hello world!");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.student.messaging;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

private final KafkaConsumer<String, byte[]> consumer;

public MessageConsumer(String bootstrapServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);
}

public void consume(String topic) {
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));

while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
}
} else
System.out.println("No messages");
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.student.messaging;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MessageProducer {

private final KafkaProducer<String, byte[]> producer;

public MessageProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
this.producer = new KafkaProducer<>(properties);
}

public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, 1, null, message));
}
}
1 change: 1 addition & 0 deletions containers-errors/Errors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
9 changes: 7 additions & 2 deletions core-back-end/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ configurations {
}

dependencies {

implementation project(':messaging-api')

implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '18.0.2'
implementation 'org.keycloak:keycloak-admin-client:23.0.3'
// implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '23.0.3'
// implementation 'org.keycloak:keycloak-admin-client:23.0.3'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}

tasks.named('test') {
useJUnitPlatform()
}

35 changes: 35 additions & 0 deletions core-back-end/src/main/java/org/student/messaging/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.student.messaging;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> configProp = new HashMap<>();
configProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new DefaultKafkaProducerFactory<>(configProp);
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.student.messaging.archiver;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class ArchiverListener {

@KafkaListener(id = "archiver-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.ARCHIVER_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group")
public void consume(byte[] message) {
System.out.println("Consumed message: " + new String(message));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.student.messaging.archiver;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class ArchiverProducer {

private final KafkaTemplate<String, byte[]> template;

ArchiverProducer(KafkaTemplate<String, byte[]> template) {
this.template = template;
}

public void send(byte[] message) {
template.send(new ProducerRecord<>(KafkaTopics.ARCHIVER_TOPIC, 0, null, message));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.student.messaging.artifact_processor;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class ArtifactProcessorListener {
@KafkaListener(id = "artifact-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.ARTIFACT_PROCESSOR_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group")
public void consume(byte[] message) {
System.out.println("Consumed message: " + new String(message));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.student.messaging.artifact_processor;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.student.messaging.topics.KafkaTopics;

public class ArtifactProcessorProducer {
private final KafkaTemplate<String, byte[]> template;

ArtifactProcessorProducer(KafkaTemplate<String, byte[]> template) {
this.template = template;
}

public void send(byte[] message) {
template.send(new ProducerRecord<>(KafkaTopics.ARTIFACT_PROCESSOR_TOPIC, 0, null, message));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.student.messaging.data_processor;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class DataProcessorListener {
@KafkaListener(id = "data-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.DATA_PROCESSOR_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group")
public void consume(byte[] message) {
System.out.println("Consumed message: " + new String(message));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.student.messaging.data_processor;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class DataProcessorProducer {
private final KafkaTemplate<String, byte[]> template;

DataProcessorProducer(KafkaTemplate<String, byte[]> template) {
this.template = template;
}

public void send(byte[] message) {
template.send(new ProducerRecord<>(KafkaTopics.DATA_PROCESSOR_TOPIC, 0, null, message));
}
}
12 changes: 12 additions & 0 deletions core-back-end/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,15 @@ keycloak:
patterns:
- /secured/*
cors: true

spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: core-file-garbage-group
auto-offset-reset: earlier
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
Loading

0 comments on commit 4ecc3d9

Please sign in to comment.