diff --git a/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java b/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java index 2d05f28a..c9ee3eac 100644 --- a/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java +++ b/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java @@ -12,24 +12,22 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Properties; -import java.util.Set; import java.util.concurrent.ExecutionException; import com.sngular.kloadgen.exception.KLoadGenException; import com.sngular.kloadgen.loadgen.BaseLoadGenerator; import com.sngular.kloadgen.model.HeaderMapping; -import com.sngular.kloadgen.property.editor.ReflectionUtils; import com.sngular.kloadgen.randomtool.generator.StatelessGeneratorTool; +import com.sngular.kloadgen.serializer.AvroSerializer; import com.sngular.kloadgen.serializer.EnrichedRecord; -import com.sngular.kloadgen.serializer.EnrichedRecordSerializer; import com.sngular.kloadgen.util.ProducerKeysHelper; import com.sngular.kloadgen.util.PropsKeysHelper; import io.apicurio.registry.serde.Legacy4ByteIdHandler; import io.apicurio.registry.serde.SerdeConfig; +import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.StringUtils; import org.apache.jmeter.config.Arguments; import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient; @@ -43,24 +41,11 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.Serializer; -import org.reflections.Reflections; -import org.reflections.scanners.Scanners; -import org.reflections.util.ClasspathHelper; -import org.reflections.util.ConfigurationBuilder; public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable { private static final String TEMPLATE = "Topic: %s, partition: %s, offset: %s"; - private static final Set SERIALIZER_SET = new HashSet<>( - ReflectionUtils.extractSerializers( - new Reflections(new ConfigurationBuilder().addUrls(ClasspathHelper.forClass(Serializer.class)).setScanners(Scanners.SubTypes)), - Serializer.class)); - - private static final Set VALUE_SERIALIZER_SET = new HashSet<>(ReflectionUtils.extractSerializers( - new Reflections(new ConfigurationBuilder().addUrls(ClasspathHelper.forClass(Serializer.class)).setScanners(Scanners.SubTypes)), - EnrichedRecordSerializer.class)); - @Serial private static final long serialVersionUID = 1L; @@ -122,13 +107,10 @@ public void setupTest(final JavaSamplerContext context) { props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT); props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT); - final Serializer valueSerializer = (Serializer) Class.forName((String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance(); producer = new KafkaProducer<>(props, (Serializer) Class.forName((String) props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance(), - valueSerializer); - } catch (final KafkaException ex) { + (Serializer) Class.forName((String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance()); + } catch (final KafkaException | ClassNotFoundException ex) { getNewLogger().error(ex.getMessage(), ex); - } catch (final ClassNotFoundException e) { - getNewLogger().error(e.getMessage(), e); } catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e) { throw new KLoadGenException(e); } @@ -157,7 +139,7 @@ public SampleResult runTest(final JavaSamplerContext javaSamplerContext) { if (Objects.nonNull(messageVal)) { try { - final var producerRecord = getProducerRecord(messageVal, enrichedKeyFlag(), enrichedValueFlag()); + final var producerRecord = getProducerRecord(messageVal); final var headersSB = new ArrayList<>(SamplerUtil.populateHeaders(kafkaHeaders, producerRecord)); sampleResult.setRequestHeaders(StringUtils.join(headersSB, ",")); @@ -191,30 +173,26 @@ private List safeGetKafkaHeaders(final JMeterContext jmeterContex return headerMappingList; } - private ProducerRecord getProducerRecord(final EnrichedRecord messageVal, final boolean keyFlag, final boolean valueFlag) { + private ProducerRecord getProducerRecord(final EnrichedRecord messageVal) { final ProducerRecord producerRecord; + + final String keySerializer = (String) props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + final String valueSerializer = (String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + if (keyMessageFlag) { if (Objects.isNull(keyGenerator)) { final var key = statelessGeneratorTool.generateObject("key", msgKeyType, 0, msgKeyValue); - producerRecord = new ProducerRecord<>(topic, key, getObject(messageVal, valueFlag)); + producerRecord = new ProducerRecord<>(topic, key, getObject(messageVal, valueSerializer)); } else { final var key = keyGenerator.nextMessage(); - producerRecord = new ProducerRecord<>(topic, getObject(key, keyFlag), getObject(messageVal, valueFlag)); + producerRecord = new ProducerRecord<>(topic, getObject(key, keySerializer), getObject(messageVal, valueSerializer)); } } else { - producerRecord = new ProducerRecord<>(topic, getObject(messageVal, valueFlag), getObject(messageVal, valueFlag)); + producerRecord = new ProducerRecord<>(topic, getObject(messageVal, valueSerializer), getObject(messageVal, valueSerializer)); } return producerRecord; } - private Boolean enrichedKeyFlag() { - return SERIALIZER_SET.contains(props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).toString()); - } - - private Boolean enrichedValueFlag() { - return VALUE_SERIALIZER_SET.contains(props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).toString()); - } - private void fillSamplerResult(final ProducerRecord producerRecord, final SampleResult sampleResult) { final String result = "key: " + producerRecord.key() @@ -242,8 +220,8 @@ private String prettyPrint(final RecordMetadata recordMetadata) { return String.format(TEMPLATE, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); } - private Object getObject(final EnrichedRecord messageVal, final boolean isKloadSerializer) { - return isKloadSerializer ? messageVal : messageVal.getGenericRecord(); + private Object getObject(final EnrichedRecord messageVal, final String serializer) { + return (serializer.contains("com.sngular.kloadgen") && !serializer.contains("Generic")) ? messageVal : messageVal.getGenericRecord(); } } diff --git a/src/main/java/com/sngular/kloadgen/serializer/AvroSerializer.java b/src/main/java/com/sngular/kloadgen/serializer/AvroSerializer.java index 9da1d1fd..378cc67a 100644 --- a/src/main/java/com/sngular/kloadgen/serializer/AvroSerializer.java +++ b/src/main/java/com/sngular/kloadgen/serializer/AvroSerializer.java @@ -18,9 +18,10 @@ import org.apache.avro.io.EncoderFactory; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serializer; @Slf4j -public class AvroSerializer implements EnrichedRecordSerializer { +public class AvroSerializer implements Serializer { private static final byte MAGIC_BYTE = 0x0; diff --git a/src/main/java/com/sngular/kloadgen/serializer/CustomStringEnrichedRecordSerializer.java b/src/main/java/com/sngular/kloadgen/serializer/CustomStringEnrichedRecordSerializer.java deleted file mode 100644 index edf5128c..00000000 --- a/src/main/java/com/sngular/kloadgen/serializer/CustomStringEnrichedRecordSerializer.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.sngular.kloadgen.serializer; - -import java.util.Map; - -import org.apache.kafka.common.serialization.StringSerializer; - -public final class CustomStringEnrichedRecordSerializer implements EnrichedRecordSerializer { - private final StringSerializer stringSerializer; - - public CustomStringEnrichedRecordSerializer() { - stringSerializer = new StringSerializer(); - } - - public void configure(final Map configs, final boolean isKey) { - stringSerializer.configure(configs, isKey); - } - - public byte[] serialize(final String topic, final EnrichedRecord data) { - return stringSerializer.serialize(topic, data.toString()); - } -} diff --git a/src/main/java/com/sngular/kloadgen/serializer/EnrichedRecordSerializer.java b/src/main/java/com/sngular/kloadgen/serializer/EnrichedRecordSerializer.java deleted file mode 100644 index c83b9325..00000000 --- a/src/main/java/com/sngular/kloadgen/serializer/EnrichedRecordSerializer.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.sngular.kloadgen.serializer; - -import org.apache.kafka.common.serialization.Serializer; - -public interface EnrichedRecordSerializer extends Serializer { - -} \ No newline at end of file diff --git a/src/main/java/com/sngular/kloadgen/util/ProducerKeysHelper.java b/src/main/java/com/sngular/kloadgen/util/ProducerKeysHelper.java index 4e4553a5..69b9af58 100644 --- a/src/main/java/com/sngular/kloadgen/util/ProducerKeysHelper.java +++ b/src/main/java/com/sngular/kloadgen/util/ProducerKeysHelper.java @@ -6,8 +6,9 @@ package com.sngular.kloadgen.util; -import com.sngular.kloadgen.serializer.CustomStringEnrichedRecordSerializer; +import com.sngular.kloadgen.serializer.AvroSerializer; import com.sngular.kloadgen.serializer.GenericAvroRecordSerializer; +import org.apache.kafka.common.serialization.StringSerializer; public final class ProducerKeysHelper { @@ -19,7 +20,7 @@ public final class ProducerKeysHelper { public static final String KAFKA_TOPIC_CONFIG_DEFAULT = ""; - public static final String KEY_SERIALIZER_CLASS_CONFIG_DEFAULT = CustomStringEnrichedRecordSerializer.class.getName(); + public static final String KEY_SERIALIZER_CLASS_CONFIG_DEFAULT = AvroSerializer.class.getName(); public static final String VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT = GenericAvroRecordSerializer.class.getName();