diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index a22fcfd8b4..6487f9f257 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; @@ -63,6 +62,7 @@ * @author Biju Kunjummen * @author Sanghyeok An * @author Hope Kim + * @author Borahm Lee * @since 1.1 */ public class BatchMessagingMessageConverter implements BatchMessageConverter { @@ -144,7 +144,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) { } @Override // NOSONAR - public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Type type) { + public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, + Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -165,65 +166,39 @@ public Message toMessage(List> records, @Nullable Acknow addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures); commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, timestamps); - records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, - convertedHeaders, natives, raws, conversionFailures, rawHeaders, type)); - return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); - } - private void processRecord(ConsumerRecord record, List payloads, List keys, - List topics, List partitions, List offsets, - List timestampTypes, List timestamps, List> convertedHeaders, - List natives, List> raws, List conversionFailures, - Map rawHeaders, Type type) { - payloads.add(obtainPayload(type, record, conversionFailures)); - keys.add(record.key()); - topics.add(record.topic()); - partitions.add(record.partition()); - offsets.add(record.offset()); - - if (record.timestampType() != null) { - timestampTypes.add(record.timestampType().name()); - } - timestamps.add(record.timestamp()); - - boolean logged = false; - String info = null; - - if (this.headerMapper != null && record.headers() != null) { - Map converted = new HashMap<>(); - this.headerMapper.toHeaders(record.headers(), converted); - convertedHeaders.add(converted); - Object object = converted.get(KafkaHeaders.LISTENER_INFO); - info = Optional.ofNullable(object) - .filter(String.class::isInstance) - .map(String.class::cast) - .orElse(null); - } - else { - if (!logged) { - logHeaderWarningOnce(); - logged = true; + String listenerInfo = null; + for (ConsumerRecord record : records) { + addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures); + if (this.headerMapper != null && record.headers() != null) { + addToConvertedHeaders(record.headers(), convertedHeaders); + if (!convertedHeaders.isEmpty()) { + Object obj = convertedHeaders.get(convertedHeaders.size() - 1).get(KafkaHeaders.LISTENER_INFO); + if (obj != null && obj instanceof String) { + listenerInfo = (String) obj; + } + } + } else { + natives.add(record.headers()); + } + if (this.rawRecordHeader) { + raws.add(record); } - natives.add(record.headers()); } - if (this.rawRecordHeader) { - raws.add(record); + if (this.headerMapper == null) { + this.logger.warn(() -> + "No header mapper is available; Jackson is required for the default mapper; " + + "headers (if present) are not mapped but provided raw in " + + KafkaHeaders.NATIVE_HEADERS); } - if (info != null) { - rawHeaders.put(KafkaHeaders.LISTENER_INFO, info); + if (listenerInfo != null) { + rawHeaders.put(KafkaHeaders.LISTENER_INFO, listenerInfo); } - } - - private void logHeaderWarningOnce() { - this.logger.debug(() -> - "No header mapper is available; Jackson is required for the default mapper; " - + "headers (if present) are not mapped but provided raw in " - + KafkaHeaders.NATIVE_HEADERS); + return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); } private void addToRawHeaders(Map rawHeaders, List> convertedHeaders, List natives, List> raws, List conversionFailures) { - if (this.headerMapper != null) { rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders); } @@ -236,12 +211,32 @@ private void addToRawHeaders(Map rawHeaders, List record, Type type, List payloads, List keys, + List topics, List partitions, List offsets, List timestampTypes, + List timestamps, List conversionFailures) { + payloads.add(obtainPayload(type, record, conversionFailures)); + keys.add(record.key()); + topics.add(record.topic()); + partitions.add(record.partition()); + offsets.add(record.offset()); + timestamps.add(record.timestamp()); + if (record.timestampType() != null) { + timestampTypes.add(record.timestampType().name()); + } + } + private Object obtainPayload(Type type, ConsumerRecord record, List conversionFailures) { return this.recordConverter == null || !containerType(type) ? extractAndConvertValue(record, type) : convert(record, type, conversionFailures); } + private void addToConvertedHeaders(Headers headers, List> convertedHeaders) { + Map converted = new HashMap<>(); + this.headerMapper.toHeaders(headers, converted); + convertedHeaders.add(converted); + } + @Override public List> fromMessage(Message message, String defaultTopic) { throw new UnsupportedOperationException();