Skip to content

Commit

Permalink
first version of messaging between core and archiver #9
Browse files Browse the repository at this point in the history
  • Loading branch information
Kasean committed Dec 29, 2023
1 parent d1776c3 commit d304ab9
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 0 deletions.
1 change: 1 addition & 0 deletions archiver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repositories {
}

dependencies {
implementation 'org.apache.kafka:kafka-clients:3.4.0'
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Expand Down
37 changes: 37 additions & 0 deletions archiver/src/main/java/org/student/messaging/MessageConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.student.messaging;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

private final KafkaConsumer<String, byte[]> consumer;

public MessageConsumer(String bootstrapServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);
}

public void consume(String topic) {
consumer.subscribe(Collections.singletonList(topic));

while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
}
}
}
}
26 changes: 26 additions & 0 deletions archiver/src/main/java/org/student/messaging/MessageProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.student.messaging;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MessageProducer {

private final KafkaProducer<String, byte[]> producer;

public MessageProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
this.producer = new KafkaProducer<>(properties);
}

public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, message));
}
}
1 change: 1 addition & 0 deletions core-back-end/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation group: 'org.keycloak', name: 'keycloak-spring-boot-adapter', version: '18.0.2'
implementation 'org.keycloak:keycloak-admin-client:23.0.3'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
Expand Down
36 changes: 36 additions & 0 deletions core-back-end/src/main/java/org/student/messaging/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.student.messaging;

import jakarta.validation.Valid;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> configProp = new HashMap<>();
configProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new DefaultKafkaProducerFactory<>(configProp);
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.student.messaging;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

@KafkaListener(topics = "archiver-topic", groupId = "core-file-garbage-group")
public void consume(byte[] message) {
System.out.println("Consumed message: " + new String(message));
}

}
12 changes: 12 additions & 0 deletions core-back-end/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,15 @@ keycloak:
patterns:
- /secured/*
cors: true

spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: core-file-garbage-group
auto-offset-reset: earlier
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ services:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "archiver-topic:1:1,data-processor-topic:1:1,artifact-processor-topic:1:1"
ports:
- "9092:9092"

Expand Down

0 comments on commit d304ab9

Please sign in to comment.