Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Milestones/messaging - Done #23

Merged
merged 17 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions archiver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repositories {
}

dependencies {
implementation 'org.apache.kafka:kafka-clients:3.4.0'
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Expand Down
5 changes: 4 additions & 1 deletion archiver/src/main/java/org/student/Application.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.student;

import org.student.messaging.MessageConsumer;

public class Application {
public static void main(String[] args) {
System.out.println("Hello world!");
MessageConsumer consumer = new MessageConsumer("localhost:9092", "core-file-garbage-group");
consumer.consume("archiver-topic");
}
}
44 changes: 44 additions & 0 deletions archiver/src/main/java/org/student/messaging/MessageConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.student.messaging;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

private final KafkaConsumer<String, byte[]> consumer;

public MessageConsumer(String bootstrapServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);
}

public void consume(String topic) {
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));

while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
}
} else
System.out.println("No messages");
}

}
}
26 changes: 26 additions & 0 deletions archiver/src/main/java/org/student/messaging/MessageProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.student.messaging;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MessageProducer {

private final KafkaProducer<String, byte[]> producer;

public MessageProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
this.producer = new KafkaProducer<>(properties);
}

public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, 1, null, message));
}
}
1 change: 1 addition & 0 deletions artifact-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repositories {
}

dependencies {
implementation 'org.apache.kafka:kafka-clients:3.4.0'
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.student;

public class Main {
public class Application {
public static void main(String[] args) {
System.out.println("Hello world!");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.student.messaging;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

private final KafkaConsumer<String, byte[]> consumer;

public MessageConsumer(String bootstrapServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);
}

public void consume(String topic) {
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));

while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
}
} else
System.out.println("No messages");
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.student.messaging;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MessageProducer {

private final KafkaProducer<String, byte[]> producer;

public MessageProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
this.producer = new KafkaProducer<>(properties);
}

public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, 1, null, message));
}
}
1 change: 1 addition & 0 deletions containers-errors/Errors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
9 changes: 7 additions & 2 deletions core-back-end/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ configurations {
}

dependencies {

implementation project(':messaging-api')

implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '18.0.2'
implementation 'org.keycloak:keycloak-admin-client:23.0.3'
// implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '23.0.3'
// implementation 'org.keycloak:keycloak-admin-client:23.0.3'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}

tasks.named('test') {
useJUnitPlatform()
}

35 changes: 35 additions & 0 deletions core-back-end/src/main/java/org/student/messaging/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.student.messaging;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> configProp = new HashMap<>();
configProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new DefaultKafkaProducerFactory<>(configProp);
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.student.messaging.archiver;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class ArchiverListener {

@KafkaListener(id = "archiver-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.ARCHIVER_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group")
public void consume(byte[] message) {
System.out.println("Consumed message: " + new String(message));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.student.messaging.archiver;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class ArchiverProducer {

private final KafkaTemplate<String, byte[]> template;

ArchiverProducer(KafkaTemplate<String, byte[]> template) {
this.template = template;
}

public void send(byte[] message) {
template.send(new ProducerRecord<>(KafkaTopics.ARCHIVER_TOPIC, 0, null, message));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.student.messaging.artifact_processor;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class ArtifactProcessorListener {
@KafkaListener(id = "artifact-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.ARTIFACT_PROCESSOR_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group")
public void consume(byte[] message) {
System.out.println("Consumed message: " + new String(message));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.student.messaging.artifact_processor;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.student.messaging.topics.KafkaTopics;

public class ArtifactProcessorProducer {
private final KafkaTemplate<String, byte[]> template;

ArtifactProcessorProducer(KafkaTemplate<String, byte[]> template) {
this.template = template;
}

public void send(byte[] message) {
template.send(new ProducerRecord<>(KafkaTopics.ARTIFACT_PROCESSOR_TOPIC, 0, null, message));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.student.messaging.data_processor;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class DataProcessorListener {
@KafkaListener(id = "data-processor-listener", topicPartitions = @TopicPartition(topic = KafkaTopics.DATA_PROCESSOR_TOPIC, partitions = {"1"}), groupId = "core-file-garbage-group")
public void consume(byte[] message) {
System.out.println("Consumed message: " + new String(message));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.student.messaging.data_processor;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.student.messaging.topics.KafkaTopics;

@Service
public class DataProcessorProducer {
private final KafkaTemplate<String, byte[]> template;

DataProcessorProducer(KafkaTemplate<String, byte[]> template) {
this.template = template;
}

public void send(byte[] message) {
template.send(new ProducerRecord<>(KafkaTopics.DATA_PROCESSOR_TOPIC, 0, null, message));
}
}
12 changes: 12 additions & 0 deletions core-back-end/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,15 @@ keycloak:
patterns:
- /secured/*
cors: true

spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: core-file-garbage-group
auto-offset-reset: earlier
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
Loading
Loading