Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decorate user-provided instances of ProducerRecord, instead of using them as a raw values #813

Merged
merged 7 commits into from
Aug 16, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import static java.util.function.Predicate.not;
import static java.util.stream.Stream.concat;

/**
* Implementation of the {@link io.micronaut.configuration.kafka.annotation.KafkaClient} advice annotation.
*
Expand Down Expand Up @@ -475,14 +478,28 @@ private MessagingClientException wrapException(MethodInvocationContext<Object, O
}

private ProducerRecord<?, ?> buildProducerRecord(MethodInvocationContext<Object, Object> context, ProducerState producerState, Object value) {
return new ProducerRecord<>(
return decorateProducerRecord(context, producerState, value)
.orElseGet(() -> new ProducerRecord<>(
producerState.topicSupplier.get(context),
producerState.partitionSupplier.get(context),
producerState.timestampSupplier.get(context),
producerState.keySupplier.get(context),
value,
producerState.headersSupplier.get(context)
);
producerState.headersSupplier.get(context)));
}

private Optional<ProducerRecord<?, ?>> decorateProducerRecord(MethodInvocationContext<Object, Object> context, ProducerState producerState, Object value) {
return Optional.ofNullable(value)
.filter(ProducerRecord.class::isInstance).map(ProducerRecord.class::cast)
.map(r -> new ProducerRecord<>(
Optional.ofNullable(r.topic()).filter(not(String::isEmpty)).orElseGet(() -> producerState.topicSupplier.get(context)),
Optional.ofNullable(r.partition()).orElseGet(() -> producerState.partitionSupplier.get(context)),
Optional.ofNullable(r.timestamp()).orElseGet(() -> producerState.timestampSupplier.get(context)),
Optional.ofNullable(r.key()).orElseGet(() -> producerState.keySupplier.get(context)),
r.value(),
concat(Optional.ofNullable(producerState.headersSupplier.get(context)).stream(),
Optional.ofNullable(r.headers()).map(Headers::toArray).map(Arrays::asList).stream())
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
.flatMap(Collection::stream).toList()));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import io.micronaut.context.annotation.Requires
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import io.micronaut.messaging.MessageHeaders
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.SendTo
import jakarta.inject.Singleton
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeader
Expand All @@ -28,6 +31,8 @@ class KafkaProducerSpec extends AbstractKafkaContainerSpec {

public static final String TOPIC_BLOCKING = "KafkaProducerSpec-users-blocking"
public static final String TOPIC_QUANTITY = "KafkaProducerSpec-users-quantity"
public static final String TOPIC_RECORDS = "KafkaProducerSpec-records"
public static final String TOPIC_RECORDS_BATCH = "KafkaProducerSpec-records-batch"

Map<String, Object> getConfiguration() {
super.configuration +
Expand All @@ -47,7 +52,7 @@ class KafkaProducerSpec extends AbstractKafkaContainerSpec {
"kafka.consumers.default.value.deserializer": StringDeserializer.name,
"kafka.consumers.default.value-deserializer": StringDeserializer.name,
"kafka.consumers.default.valueDeserializer" : StringDeserializer.name,
(EMBEDDED_TOPICS) : [TOPIC_BLOCKING]]
(EMBEDDED_TOPICS) : [TOPIC_BLOCKING, TOPIC_RECORDS, TOPIC_RECORDS_BATCH]]
}

void "test customize defaults"() {
Expand Down Expand Up @@ -134,6 +139,103 @@ class KafkaProducerSpec extends AbstractKafkaContainerSpec {
}
}

void "test send producer record"() {
given:
ProducerRecordClient client = context.getBean(ProducerRecordClient)
ConsumerRecordListener listener = context.getBean(ConsumerRecordListener)

when:
client.sendRecord("c", "DEFAULT KEY", new ProducerRecord<>("", "one", "ONE"))
client.sendRecord("c", "DEFAULT KEY", new ProducerRecord<>("", "TWO"))
client.sendRecord("c", "DEFAULT KEY", new ProducerRecord<>("", null, "three", "THREE", [
new RecordHeader("A", "a2".bytes),
new RecordHeader("B", "b2".bytes),
new RecordHeader("C", "c2".bytes),
new RecordHeader("D", "d2".bytes)]))

then:
conditions.eventually {
listener.records.size() == 3
listener.records[0].key() == "one"
listener.records[0].value() == "ONE"
listener.records[0].topic() == TOPIC_RECORDS
listener.records[0].partition() == 0
listener.records[0].timestamp() > 0
listener.records[0].headers().headers("A").any { it.value() == "a".bytes }
listener.records[0].headers().headers("B").any { it.value() == "b".bytes }
listener.records[0].headers().headers("C").any { it.value() == "c".bytes }
listener.records[1].key() == "DEFAULT KEY"
listener.records[1].value() == "TWO"
listener.records[1].topic() == TOPIC_RECORDS
listener.records[1].partition() == 0
listener.records[1].timestamp() > 0
listener.records[1].headers().headers("A").any { it.value() == "a".bytes }
listener.records[1].headers().headers("B").any { it.value() == "b".bytes }
listener.records[1].headers().headers("C").any { it.value() == "c".bytes }
listener.records[2].key() == "three"
listener.records[2].value() == "THREE"
listener.records[2].topic() == TOPIC_RECORDS
listener.records[2].partition() == 0
listener.records[2].timestamp() > 0
listener.records[2].headers().headers("A").any { it.value() == "a".bytes }
listener.records[2].headers().headers("B").any { it.value() == "b".bytes }
listener.records[2].headers().headers("C").any { it.value() == "c".bytes }
listener.records[2].headers().headers("A").any { it.value() == "a2".bytes }
listener.records[2].headers().headers("B").any { it.value() == "b2".bytes }
listener.records[2].headers().headers("C").any { it.value() == "c2".bytes }
listener.records[2].headers().headers("D").any { it.value() == "d2".bytes }
}
}

void "test batch send producer record"() {
given:
ProducerRecordBatchClient client = context.getBean(ProducerRecordBatchClient)
ConsumerRecordListener listener = context.getBean(ConsumerRecordListener)

when:
client.sendRecords("c", "DEFAULT KEY", List.of(
new ProducerRecord<>("", "one", "ONE"),
new ProducerRecord<>("", "TWO"),
new ProducerRecord<>("", null, "three", "THREE", [
new RecordHeader("A", "a2".bytes),
new RecordHeader("B", "b2".bytes),
new RecordHeader("C", "c2".bytes),
new RecordHeader("D", "d2".bytes)])))

then:
conditions.eventually {
listener.batch.size() == 3
listener.batch[0].key() == "one"
listener.batch[0].value() == "ONE"
listener.batch[0].topic() == TOPIC_RECORDS_BATCH
listener.batch[0].partition() == 0
listener.batch[0].timestamp() > 0
listener.batch[0].headers().headers("A").any { it.value() == "a".bytes }
listener.batch[0].headers().headers("B").any { it.value() == "b".bytes }
listener.batch[0].headers().headers("C").any { it.value() == "c".bytes }
listener.batch[1].key() == "DEFAULT KEY"
listener.batch[1].value() == "TWO"
listener.batch[1].topic() == TOPIC_RECORDS_BATCH
listener.batch[1].partition() == 0
listener.batch[1].timestamp() > 0
listener.batch[1].headers().headers("A").any { it.value() == "a".bytes }
listener.batch[1].headers().headers("B").any { it.value() == "b".bytes }
listener.batch[1].headers().headers("C").any { it.value() == "c".bytes }
listener.batch[2].key() == "three"
listener.batch[2].value() == "THREE"
listener.batch[2].topic() == TOPIC_RECORDS_BATCH
listener.batch[2].partition() == 0
listener.batch[2].timestamp() > 0
listener.batch[2].headers().headers("A").any { it.value() == "a".bytes }
listener.batch[2].headers().headers("B").any { it.value() == "b".bytes }
listener.batch[2].headers().headers("C").any { it.value() == "c".bytes }
listener.batch[2].headers().headers("A").any { it.value() == "a2".bytes }
listener.batch[2].headers().headers("B").any { it.value() == "b2".bytes }
listener.batch[2].headers().headers("C").any { it.value() == "c2".bytes }
listener.batch[2].headers().headers("D").any { it.value() == "d2".bytes }
}
}

@Requires(property = 'spec.name', value = 'KafkaProducerSpec')
@KafkaClient(acks = ALL, id = "named")
static interface NamedClient {
Expand Down Expand Up @@ -259,4 +361,33 @@ class KafkaProducerSpec extends AbstractKafkaContainerSpec {
event.bean
}
}

@Requires(property = 'spec.name', value = 'KafkaProducerSpec')
@KafkaClient
@MessageHeader(name = "A", value = "a")
static interface ProducerRecordClient {
@Topic(TOPIC_RECORDS)
@MessageHeader(name = "B", value = "b")
String sendRecord(@MessageHeader(name = "C") String header, @KafkaKey String key, ProducerRecord<String, String> record)
}

@Requires(property = 'spec.name', value = 'KafkaProducerSpec')
@KafkaClient(batch = true)
@MessageHeader(name = "A", value = "a")
static interface ProducerRecordBatchClient {
@Topic(TOPIC_RECORDS_BATCH)
@MessageHeader(name = "B", value = "b")
String sendRecords(@MessageHeader(name = "C") String header, @KafkaKey String key, List<ProducerRecord<String, String>> records)
}

@Requires(property = 'spec.name', value = 'KafkaProducerSpec')
@KafkaListener(offsetReset = EARLIEST)
static class ConsumerRecordListener {
List<ConsumerRecord<String, String>> records = []
List<ConsumerRecord<String, String>> batch = []
@Topic(TOPIC_RECORDS)
void receive(ConsumerRecord<String, String> record) { records << record }
@Topic(TOPIC_RECORDS_BATCH)
void receiveBatch(ConsumerRecord<String, String> record) { batch << record }
}
}