Skip to content

Commit

Permalink
#25 producers implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Siarhei_Kakichau committed Jul 5, 2024
1 parent f6e4754 commit 563e803
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 62 deletions.
2 changes: 2 additions & 0 deletions artifact-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ repositories {

dependencies {

implementation project(':messaging-api')

implementation 'org.yaml:snakeyaml:2.0'

implementation 'org.apache.kafka:kafka-clients:3.4.0'
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.student.api;

@FunctionalInterface
public interface ProcessMessageFunction<F, S, T>{

void accept(F f, S s, T topic);

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.student.api.VoidBiFunction;
import org.student.api.ProcessMessageFunction;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CreateConsumer implements MessageConsumer{
private final KafkaConsumer<String, byte[]> consumer;
private final VoidBiFunction<String, byte[]> service;
private final ProcessMessageFunction<String, byte[], String> service;
private final String topic;

public CreateConsumer(String bootstrapServers, String groupId, String topic, VoidBiFunction<String, byte[]> service) {
public CreateConsumer(String bootstrapServers, String groupId, String topic, ProcessMessageFunction<String, byte[], String> service) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Expand All @@ -41,7 +41,7 @@ public void consume() {
if (!records.isEmpty()) {
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
service.accept(record.key(), record.value());
service.accept(record.key(), record.value(), topic);
}
} else
System.out.println("No messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.student.api.VoidBiFunction;
import org.student.api.ProcessMessageFunction;

import java.time.Duration;
import java.util.Collections;
Expand All @@ -15,10 +15,10 @@

public class DeleteConsumer implements MessageConsumer{
private final KafkaConsumer<String, String> consumer;
private final VoidBiFunction<String, UUID> service;
private final ProcessMessageFunction<String, UUID, String> service;
private final String topic;

public DeleteConsumer(String bootstrapServers, String groupId, String topic, VoidBiFunction<String, UUID> service) {
public DeleteConsumer(String bootstrapServers, String groupId, String topic, ProcessMessageFunction<String, UUID, String> service) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Expand All @@ -40,7 +40,7 @@ public void consume() {
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Consumed message: " + record.value());
service.accept(record.key(), UUID.fromString(record.value()));
service.accept(record.key(), UUID.fromString(record.value()), topic);
}
} else
System.out.println("No messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.student.api.VoidBiFunction;
import org.student.api.ProcessMessageFunction;

import java.time.Duration;
import java.util.Collections;
Expand All @@ -15,10 +15,10 @@

public class ReadConsumer implements MessageConsumer{
private final KafkaConsumer<String, String> consumer;
private final VoidBiFunction<String, UUID> service;
private final ProcessMessageFunction<String, UUID, String> service;
private final String topic;

public ReadConsumer(String bootstrapServers, String groupId, String topic, VoidBiFunction<String, UUID> service) {
public ReadConsumer(String bootstrapServers, String groupId, String topic, ProcessMessageFunction<String, UUID, String> service) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Expand All @@ -40,7 +40,7 @@ public void consume() {
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Consumed message: " + record.value());
service.accept(record.key(), UUID.fromString(record.value()));
service.accept(record.key(), UUID.fromString(record.value()), topic);
}
} else
System.out.println("No messages");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.student.api.managers;

import org.student.messaging.models.BaseArtifactMessage;

public interface KafkaSendFacade {
<T extends BaseArtifactMessage> void send(String topic, String key, Class<T> messageType, T message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.student.api.managers;

import org.student.api.producers.MessageProducer;
import org.student.api.producers.MessageProducerImpl;
import org.student.messaging.models.BaseArtifactMessage;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class KafkaSendFacadeImpl implements KafkaSendFacade {

private final Map<String, MessageProducer> producers = new ConcurrentHashMap<>();
private final String bootstrapServer;


public KafkaSendFacadeImpl(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
}

@Override
public <T extends BaseArtifactMessage> void send(String topic, String key, Class<T> messageType, T message) {
@SuppressWarnings("unchecked")
MessageProducer<T> producer = (MessageProducer<T>) producers.computeIfAbsent(topic, t -> createProducer(messageType, topic));
producer.send(key, message);
}

private <T extends BaseArtifactMessage> MessageProducer<T> createProducer(Class<T> messageType, String topic) {
return new MessageProducerImpl<>(bootstrapServer, topic, messageType);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.student.api.producers;


import org.student.messaging.models.BaseArtifactMessage;

public interface MessageProducer<T extends BaseArtifactMessage> {
void send(String key, T message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.student.api.producers;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.student.messaging.models.BaseArtifactMessage;
import org.student.messaging.models.serializers.JsonSerializer;

import java.util.Properties;

public class MessageProducerImpl<T extends BaseArtifactMessage> implements MessageProducer<T> {

private final KafkaProducer<String, T> producer;
private final String topic;

public MessageProducerImpl(String bootstrapServers, String topic, Class<T> messageType) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
properties.put("value.serializer.type", messageType);
this.producer = new KafkaProducer<>(properties);
this.topic = topic;
}

@Override
public void send(String key, T message) {
producer.send(new ProducerRecord<>(topic, 1, key, message));
}

// public MessageProducerImpl(String bootstrapServers, String topic) {
// Properties properties = new Properties();
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
//// props.put("value.deserializer.type", MyObject.class);
//// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class
// this.producer = new KafkaProducer<>(properties);
// this.topic = topic;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

public interface ArtifactsService {

void createArtifactMessage(String key, byte[] artifactMessage);
void createArtifactMessage(String key, byte[] artifactMessage, String topic);

void readArtifactMessage(String key, UUID id);
void readArtifactMessage(String key, UUID id, String topic);

void updateArtifactMessage(String key, UUID id, byte[] newArtifactMessage); // TODO: not in alfa
void deleteArtifactMessage(String key, UUID id);
void deleteArtifactMessage(String key, UUID id, String topic);

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package org.student.services;

import org.student.api.managers.KafkaSendFacade;
import org.student.api.managers.KafkaSendFacadeImpl;
import org.student.archiver.ArchiverService;
import org.student.archiver.ArchiverServiceImpl;
import org.student.configs.ApplicationConfig;
import org.student.api.MessageProducer;
import org.student.api.producers.MessageProducer;
import org.student.messaging.models.BaseArtifactMessage;
import org.student.messaging.models.BodyArtifactMessage;
import org.student.messaging.models.ResponseCode;
import org.student.repositories.ArtifactsIMStorage;
import org.student.repositories.ArtifactsRepository;

Expand All @@ -14,28 +19,38 @@ public class ArtifactsServiceImpl implements ArtifactsService {
private final ArchiverService archiver;
private final ArtifactsRepository repository = new ArtifactsIMStorage(); // TODO: add choice in config: real db or in memory storage

private final MessageProducer producer;
private final KafkaSendFacade producer;

public ArtifactsServiceImpl(ApplicationConfig config) {
this.archiver = new ArchiverServiceImpl(config.getKeyStore());
this.producer = new MessageProducer(config.getKafka().getBootstrapServers());
this.producer = new KafkaSendFacadeImpl(config.getKafka().getBootstrapServers());
}

@Override
public void createArtifactMessage(String key, byte[] artifactMessage) {
public void createArtifactMessage(String key, byte[] artifactMessage, String topic) {
var artifact = archiver.encrypt(artifactMessage);

var id = repository.saveArtifact(artifact);

// TODO: send to producer;
var artifactResponse = new BaseArtifactMessage();
artifactResponse.setResponseCode(ResponseCode.CREATED);
artifactResponse.setInternalId(id);

producer.send(topic, key, BaseArtifactMessage.class, artifactResponse);
}

@Override
public void readArtifactMessage(String key, UUID id) {
public void readArtifactMessage(String key, UUID id, String topic) {
var encryptArtifact = repository.getArtifact(id).orElseThrow(() -> new IllegalArgumentException("Artifact with id " + id + " not founded."));

var res = archiver.decrypt(encryptArtifact).getArtifactData();
// TODO: send to producer;

var artifactResponse = new BodyArtifactMessage();
artifactResponse.setArtifactBody(res);
artifactResponse.setResponseCode(ResponseCode.READED);
artifactResponse.setInternalId(id);

producer.send(topic, key, BodyArtifactMessage.class, artifactResponse);
}

@Override
Expand All @@ -44,11 +59,16 @@ public void updateArtifactMessage(String key, UUID id, byte[] newArtifactMessage
}

@Override
public void deleteArtifactMessage(String key, UUID id) {
public void deleteArtifactMessage(String key, UUID id, String topic) {
var encryptedDeletedArtifact = repository.deleteArtifact(id);

var res = archiver.decrypt(encryptedDeletedArtifact).getArtifactData();

// TODO: send to producer;
var artifactResponse = new BodyArtifactMessage();
artifactResponse.setArtifactBody(res);
artifactResponse.setResponseCode(ResponseCode.DELETED);
artifactResponse.setInternalId(id);

producer.send(topic, key, BodyArtifactMessage.class, artifactResponse);
}
}

0 comments on commit 563e803

Please sign in to comment.