Skip to content

Commit

Permalink
Merge pull request #20 from Kasean/12-artifact-processor-messaging
Browse files Browse the repository at this point in the history
artifact processor messaging #12
  • Loading branch information
Kasean authored Dec 29, 2023
2 parents e5f7daf + 719c3af commit d95d3c7
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 2 deletions.
1 change: 1 addition & 0 deletions artifact-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repositories {
}

dependencies {
implementation 'org.apache.kafka:kafka-clients:3.4.0'
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.student;

public class Main {
public class Application {
public static void main(String[] args) {
System.out.println("Hello world!");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.student.messaging;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MessageConsumer {

private final KafkaConsumer<String, byte[]> consumer;

public MessageConsumer(String bootstrapServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);
}

public void consume(String topic) {
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));

while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("Consumed message: " + new String(record.value()));
}
} else
System.out.println("No messages");
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.student.messaging;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MessageProducer {

private final KafkaProducer<String, byte[]> producer;

public MessageProducer(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
this.producer = new KafkaProducer<>(properties);
}

public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, 1, null, message));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.student;

public class Main {
public class Application {
public static void main(String[] args) {
System.out.println("Hello world!");
}
Expand Down

0 comments on commit d95d3c7

Please sign in to comment.