Skip to content

Commit

Permalink
Changed how we choose the Record for KafkaProducerSampler
Browse files Browse the repository at this point in the history
  • Loading branch information
pablorodriguez-sngular committed Dec 13, 2023
1 parent d8bba55 commit 6d7fec5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,22 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import com.sngular.kloadgen.exception.KLoadGenException;
import com.sngular.kloadgen.loadgen.BaseLoadGenerator;
import com.sngular.kloadgen.model.HeaderMapping;
import com.sngular.kloadgen.property.editor.ReflectionUtils;
import com.sngular.kloadgen.randomtool.generator.StatelessGeneratorTool;
import com.sngular.kloadgen.serializer.AvroSerializer;
import com.sngular.kloadgen.serializer.EnrichedRecord;
import com.sngular.kloadgen.serializer.EnrichedRecordSerializer;
import com.sngular.kloadgen.util.ProducerKeysHelper;
import com.sngular.kloadgen.util.PropsKeysHelper;
import io.apicurio.registry.serde.Legacy4ByteIdHandler;
import io.apicurio.registry.serde.SerdeConfig;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
Expand All @@ -43,24 +41,11 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Serializer;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;

public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable {

private static final String TEMPLATE = "Topic: %s, partition: %s, offset: %s";

private static final Set<String> SERIALIZER_SET = new HashSet<>(
ReflectionUtils.extractSerializers(
new Reflections(new ConfigurationBuilder().addUrls(ClasspathHelper.forClass(Serializer.class)).setScanners(Scanners.SubTypes)),
Serializer.class));

private static final Set<String> VALUE_SERIALIZER_SET = new HashSet<>(ReflectionUtils.extractSerializers(
new Reflections(new ConfigurationBuilder().addUrls(ClasspathHelper.forClass(Serializer.class)).setScanners(Scanners.SubTypes)),
EnrichedRecordSerializer.class));

@Serial
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -122,13 +107,10 @@ public void setupTest(final JavaSamplerContext context) {
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT);

final Serializer valueSerializer = (Serializer) Class.forName((String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance();
producer = new KafkaProducer<>(props, (Serializer) Class.forName((String) props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance(),
valueSerializer);
} catch (final KafkaException ex) {
(Serializer) Class.forName((String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance());
} catch (final KafkaException | ClassNotFoundException ex) {
getNewLogger().error(ex.getMessage(), ex);
} catch (final ClassNotFoundException e) {
getNewLogger().error(e.getMessage(), e);
} catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e) {
throw new KLoadGenException(e);
}
Expand Down Expand Up @@ -157,7 +139,7 @@ public SampleResult runTest(final JavaSamplerContext javaSamplerContext) {

if (Objects.nonNull(messageVal)) {
try {
final var producerRecord = getProducerRecord(messageVal, enrichedKeyFlag(), enrichedValueFlag());
final var producerRecord = getProducerRecord(messageVal);
final var headersSB = new ArrayList<>(SamplerUtil.populateHeaders(kafkaHeaders, producerRecord));

sampleResult.setRequestHeaders(StringUtils.join(headersSB, ","));
Expand Down Expand Up @@ -191,30 +173,26 @@ private List<HeaderMapping> safeGetKafkaHeaders(final JMeterContext jmeterContex
return headerMappingList;
}

private ProducerRecord<Object, Object> getProducerRecord(final EnrichedRecord messageVal, final boolean keyFlag, final boolean valueFlag) {
private ProducerRecord<Object, Object> getProducerRecord(final EnrichedRecord messageVal) {
final ProducerRecord<Object, Object> producerRecord;

final String keySerializer = (String) props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
final String valueSerializer = (String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);

if (keyMessageFlag) {
if (Objects.isNull(keyGenerator)) {
final var key = statelessGeneratorTool.generateObject("key", msgKeyType, 0, msgKeyValue);
producerRecord = new ProducerRecord<>(topic, key, getObject(messageVal, valueFlag));
producerRecord = new ProducerRecord<>(topic, key, getObject(messageVal, valueSerializer));
} else {
final var key = keyGenerator.nextMessage();
producerRecord = new ProducerRecord<>(topic, getObject(key, keyFlag), getObject(messageVal, valueFlag));
producerRecord = new ProducerRecord<>(topic, getObject(key, keySerializer), getObject(messageVal, valueSerializer));
}
} else {
producerRecord = new ProducerRecord<>(topic, getObject(messageVal, valueFlag), getObject(messageVal, valueFlag));
producerRecord = new ProducerRecord<>(topic, getObject(messageVal, valueSerializer), getObject(messageVal, valueSerializer));
}
return producerRecord;
}

private Boolean enrichedKeyFlag() {
return SERIALIZER_SET.contains(props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).toString());
}

private Boolean enrichedValueFlag() {
return VALUE_SERIALIZER_SET.contains(props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).toString());
}

private void fillSamplerResult(final ProducerRecord<Object, Object> producerRecord, final SampleResult sampleResult) {
final String result = "key: "
+ producerRecord.key()
Expand Down Expand Up @@ -242,8 +220,8 @@ private String prettyPrint(final RecordMetadata recordMetadata) {
return String.format(TEMPLATE, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}

private Object getObject(final EnrichedRecord messageVal, final boolean isKloadSerializer) {
return isKloadSerializer ? messageVal : messageVal.getGenericRecord();
private Object getObject(final EnrichedRecord messageVal, final String serializer) {
return (serializer.contains("com.sngular.kloadgen") && !serializer.contains("Generic")) ? messageVal : messageVal.getGenericRecord();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

@Slf4j
public class AvroSerializer<T extends EnrichedRecord> implements EnrichedRecordSerializer<T> {
public class AvroSerializer<T extends EnrichedRecord> implements Serializer<T> {

private static final byte MAGIC_BYTE = 0x0;

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

package com.sngular.kloadgen.util;

import com.sngular.kloadgen.serializer.CustomStringEnrichedRecordSerializer;
import com.sngular.kloadgen.serializer.AvroSerializer;
import com.sngular.kloadgen.serializer.GenericAvroRecordSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public final class ProducerKeysHelper {

Expand All @@ -19,7 +20,7 @@ public final class ProducerKeysHelper {

public static final String KAFKA_TOPIC_CONFIG_DEFAULT = "<Topic>";

public static final String KEY_SERIALIZER_CLASS_CONFIG_DEFAULT = CustomStringEnrichedRecordSerializer.class.getName();
public static final String KEY_SERIALIZER_CLASS_CONFIG_DEFAULT = AvroSerializer.class.getName();

public static final String VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT = GenericAvroRecordSerializer.class.getName();

Expand Down

0 comments on commit 6d7fec5

Please sign in to comment.