Skip to content

Commit

Permalink
Merge pull request #30 from Kasean/25-create-file-service
Browse files Browse the repository at this point in the history
First version of artifacts service implementation
  • Loading branch information
Kasean authored Jul 5, 2024
2 parents 7f53937 + 66cafd2 commit c88b00b
Show file tree
Hide file tree
Showing 50 changed files with 1,377 additions and 106 deletions.
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
1. core application - 8080
2. keycloak - 9990

### Hypothetical structure
### Base structure

![Structure](docs/pictures/FileCloudStorageBaseStruct.png)

### Modules docs:

#### 1. Artifact Processor

##### Base structure:

![ArtifactProcessorStructure](docs/pictures/ArtifactProcessorBaseStructure.png)

Core data storage and Archiver data storage modules - in alfa version this is simple in memory storage (Map for example).
In first beta version - ![Riak KV](https://riak.com/products/integrations/) or Apache cassandra

![Structure](docs/structure.png)
20 changes: 0 additions & 20 deletions archiver/build.gradle

This file was deleted.

10 changes: 0 additions & 10 deletions archiver/src/main/java/org/student/Application.java

This file was deleted.

26 changes: 0 additions & 26 deletions archiver/src/main/java/org/student/messaging/MessageProducer.java

This file was deleted.

8 changes: 8 additions & 0 deletions artifact-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@ repositories {
}

dependencies {

implementation project(':messaging-api')

implementation 'org.yaml:snakeyaml:2.0'

implementation 'org.apache.kafka:kafka-clients:3.4.0'
implementation 'org.bouncycastle:bcprov-jdk18on:1.78.1'
implementation 'org.bouncycastle:bcpkix-jdk18on:1.78.1'

testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Expand Down
63 changes: 61 additions & 2 deletions artifact-processor/src/main/java/org/student/Application.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,66 @@
package org.student;

import org.student.api.factories.ConsumerFactory;
import org.student.api.managers.ConsumersManager;
import org.student.api.managers.ConsumersManagerImpl;
import org.student.configs.ApplicationConfig;
import org.student.api.consumers.MessageConsumer;
import org.student.services.ArtifactsService;
import org.student.services.ArtifactsServiceImpl;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.env.EnvScalarConstructor;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicReference;

import static org.yaml.snakeyaml.env.EnvScalarConstructor.ENV_FORMAT;
import static org.yaml.snakeyaml.env.EnvScalarConstructor.ENV_TAG;

public class Application {
public static void main(String[] args) {
System.out.println("Hello world!");

private static final AtomicReference<ConsumersManager> consumersManager = new AtomicReference<>(null);

public static void main(String[] args) throws IOException {

if (args.length != 1) {
System.err.println("Config missing.");
return;
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown hook is running!");

if (consumersManager.get() != null){
consumersManager.get().shutdown();
}

}));

ApplicationConfig config = loadConfig(args[0]);

new Thread(() -> {
ArtifactsService artifactsService = new ArtifactsServiceImpl(config);

var consumers = ConsumerFactory.createConsumers(config.getKafka(), artifactsService);

if (consumersManager.get() == null) {
consumersManager.set(new ConsumersManagerImpl(consumers));
consumersManager.get().startListenMessages();
}
}).start();
}

private static ApplicationConfig loadConfig(String configFile) throws IOException {
Yaml yaml = new Yaml(new EnvScalarConstructor());
yaml.addImplicitResolver(ENV_TAG, ENV_FORMAT, "$");

ApplicationConfig config;
try (InputStream in = Files.newInputStream(Paths.get(configFile))) {
config = yaml.loadAs(in, ApplicationConfig.class);
}

return config;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.student.messaging;
package org.student.api.consumers;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -7,25 +7,32 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.student.api.utils.ProcessMessageFunction;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

public class CreateConsumer implements MessageConsumer{
private final KafkaConsumer<String, byte[]> consumer;
private final ProcessMessageFunction<String, byte[], String> service;
private final String topic;

public MessageConsumer(String bootstrapServers, String groupId) {
public CreateConsumer(String bootstrapServers, String groupId, String topic, ProcessMessageFunction<String, byte[], String> service) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);

this.service = service;
this.topic = topic;
}

public void consume(String topic) {

@Override
public void consume() {
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));

Expand All @@ -34,10 +41,10 @@ public void consume(String topic) {
if (!records.isEmpty()) {
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
service.accept(record.key(), record.value(), topic);
}
} else
System.out.println("No messages");
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.student.api.consumers;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.student.api.utils.ProcessMessageFunction;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;

public class DeleteConsumer implements MessageConsumer{
private final KafkaConsumer<String, String> consumer;
private final ProcessMessageFunction<String, UUID, String> service;
private final String topic;

public DeleteConsumer(String bootstrapServers, String groupId, String topic, ProcessMessageFunction<String, UUID, String> service) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);

this.service = service;
this.topic = topic;
}

@Override
public void consume() {
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Consumed message: " + record.value());
service.accept(record.key(), UUID.fromString(record.value()), topic);
}
} else
System.out.println("No messages");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.student.api.consumers;

public interface MessageConsumer {

void consume();
}
Original file line number Diff line number Diff line change
@@ -1,44 +1,49 @@
package org.student.messaging;
package org.student.api.consumers;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.student.api.utils.ProcessMessageFunction;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;

public class MessageConsumer {
public class ReadConsumer implements MessageConsumer{
private final KafkaConsumer<String, String> consumer;
private final ProcessMessageFunction<String, UUID, String> service;
private final String topic;

private final KafkaConsumer<String, byte[]> consumer;

public MessageConsumer(String bootstrapServers, String groupId) {
public ReadConsumer(String bootstrapServers, String groupId, String topic, ProcessMessageFunction<String, UUID, String> service) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);

this.service = service;
this.topic = topic;
}

public void consume(String topic) {
@Override
public void consume() {
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));

while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Consumed message: " + record.value());
service.accept(record.key(), UUID.fromString(record.value()), topic);
}
} else
System.out.println("No messages");
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.student.api.consumers;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.student.configs.ApplicationConfig;
import org.student.services.ArtifactsService;
import org.student.services.ArtifactsServiceImpl;

import java.util.Properties;

public class UpdateConsumer implements MessageConsumer{
private final KafkaConsumer<String, byte[]> consumer;
private final ArtifactsService artifactsService;

public UpdateConsumer(ApplicationConfig config) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafka().getBootstrapServers());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.getKafka().getGroupId());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);

this.artifactsService = new ArtifactsServiceImpl(config);
}

@Override
public void consume() { // Not implemented

}
}
Loading

0 comments on commit c88b00b

Please sign in to comment.