From 9ea2f92fb916423f3546a65ecbcc9cf06bd44c1a Mon Sep 17 00:00:00 2001 From: David Garcia Date: Thu, 15 Dec 2022 13:57:34 +0100 Subject: [PATCH] #313 moving avro logical types conversion config to avro serializers --- .../sampler/KafkaConsumerSampler.java | 18 ------ .../sampler/KafkaProducerSampler.java | 32 ++++------- .../kloadgen/serializer/AvroSerializer.java | 5 +- .../serializer/AvroSerializersUtil.java | 22 ++++++++ .../GenericAvroRecordBinarySerializer.java | 4 ++ .../GenericAvroRecordSerializer.java | 4 ++ .../serializer/AvroSerializersTest.java | 56 +++++-------------- 7 files changed, 61 insertions(+), 80 deletions(-) create mode 100644 src/main/java/net/coru/kloadgen/serializer/AvroSerializersUtil.java diff --git a/src/main/java/net/coru/kloadgen/sampler/KafkaConsumerSampler.java b/src/main/java/net/coru/kloadgen/sampler/KafkaConsumerSampler.java index 67b0f549..54606d0b 100644 --- a/src/main/java/net/coru/kloadgen/sampler/KafkaConsumerSampler.java +++ b/src/main/java/net/coru/kloadgen/sampler/KafkaConsumerSampler.java @@ -19,9 +19,6 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.coru.kloadgen.util.ProducerKeysHelper; -import org.apache.avro.Conversions; -import org.apache.avro.data.TimeConversions; -import org.apache.avro.generic.GenericData; import org.apache.jmeter.config.Arguments; import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient; import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext; @@ -45,7 +42,6 @@ public final void setupTest(final JavaSamplerContext context) { final var props = properties(context); final var topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG); consumer = new KafkaConsumer<>(props); - configGenericData(); consumer.subscribe(Collections.singletonList(topic)); } @@ -60,20 +56,6 @@ public final Properties properties(final JavaSamplerContext context) { return props; } - private void configGenericData() { - final var genericData = GenericData.get(); - - genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); - genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); - genericData.addLogicalTypeConversion(new Conversions.UUIDConversion()); - } - @Override public final void teardownTest(final JavaSamplerContext context) { if (Objects.nonNull(consumer)) { diff --git a/src/main/java/net/coru/kloadgen/sampler/KafkaProducerSampler.java b/src/main/java/net/coru/kloadgen/sampler/KafkaProducerSampler.java index df571f3e..963368a5 100644 --- a/src/main/java/net/coru/kloadgen/sampler/KafkaProducerSampler.java +++ b/src/main/java/net/coru/kloadgen/sampler/KafkaProducerSampler.java @@ -25,9 +25,6 @@ import net.coru.kloadgen.serializer.ProtobufSerializer; import net.coru.kloadgen.util.ProducerKeysHelper; import net.coru.kloadgen.util.PropsKeysHelper; -import org.apache.avro.Conversions; -import org.apache.avro.data.TimeConversions; -import org.apache.avro.generic.GenericData; import org.apache.commons.lang3.StringUtils; import org.apache.jmeter.config.Arguments; import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient; @@ -44,16 +41,27 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable { private static final String TEMPLATE = "Topic: %s, partition: %s, offset: %s"; + private static final Set SERIALIZER_SET = Set.of(AvroSerializer.class.getName(), ProtobufSerializer.class.getName()); + private static final long serialVersionUID = 1L; + private final transient StatelessGeneratorTool statelessGeneratorTool = new StatelessGeneratorTool(); + private transient KafkaProducer producer; + private String topic; + private String msgKeyType; + private List msgKeyValue; + private boolean keyMessageFlag = false; + private transient BaseLoadGenerator generator; + private transient BaseLoadGenerator keyGenerator; + private transient Properties props; @Override @@ -62,8 +70,6 @@ public void setupTest(final JavaSamplerContext context) { generator = SamplerUtil.configureValueGenerator(props); - configGenericData(); - if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY)) || "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) { keyMessageFlag = true; @@ -72,7 +78,7 @@ public void setupTest(final JavaSamplerContext context) { } else { msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE); msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)) - ? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)); + ? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)); } } else { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT); @@ -94,20 +100,6 @@ private Properties properties(final JavaSamplerContext context) { return commonProps; } - private void configGenericData() { - final var genericData = GenericData.get(); - - genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); - genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); - genericData.addLogicalTypeConversion(new Conversions.UUIDConversion()); - } - @Override public void teardownTest(final JavaSamplerContext context) { if (Objects.nonNull(producer)) { diff --git a/src/main/java/net/coru/kloadgen/serializer/AvroSerializer.java b/src/main/java/net/coru/kloadgen/serializer/AvroSerializer.java index 3ead9cc1..4444a024 100644 --- a/src/main/java/net/coru/kloadgen/serializer/AvroSerializer.java +++ b/src/main/java/net/coru/kloadgen/serializer/AvroSerializer.java @@ -11,7 +11,6 @@ import java.nio.ByteBuffer; import javax.xml.bind.DatatypeConverter; - import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -27,6 +26,10 @@ public class AvroSerializer implements Serializer { private static final int ID_SIZE = 4; + public AvroSerializer() { + AvroSerializersUtil.setupLogicalTypesConversion(); + } + @Override public final byte[] serialize(final String topic, final T data) { try { diff --git a/src/main/java/net/coru/kloadgen/serializer/AvroSerializersUtil.java b/src/main/java/net/coru/kloadgen/serializer/AvroSerializersUtil.java new file mode 100644 index 00000000..319c1022 --- /dev/null +++ b/src/main/java/net/coru/kloadgen/serializer/AvroSerializersUtil.java @@ -0,0 +1,22 @@ +package net.coru.kloadgen.serializer; + +import org.apache.avro.Conversions; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericData; + +public class AvroSerializersUtil { + + public static void setupLogicalTypesConversion() { + final var genericData = GenericData.get(); + + genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); + genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion()); + genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); + genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); + genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); + genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + genericData.addLogicalTypeConversion(new Conversions.UUIDConversion()); + } +} diff --git a/src/main/java/net/coru/kloadgen/serializer/GenericAvroRecordBinarySerializer.java b/src/main/java/net/coru/kloadgen/serializer/GenericAvroRecordBinarySerializer.java index 6eab1807..84b1373f 100644 --- a/src/main/java/net/coru/kloadgen/serializer/GenericAvroRecordBinarySerializer.java +++ b/src/main/java/net/coru/kloadgen/serializer/GenericAvroRecordBinarySerializer.java @@ -20,6 +20,10 @@ @Slf4j public class GenericAvroRecordBinarySerializer implements Serializer { + public GenericAvroRecordBinarySerializer() { + AvroSerializersUtil.setupLogicalTypesConversion(); + } + @Override public final byte[] serialize(final String s, final T data) { final DatumWriter writer = new GenericDatumWriter<>(data.getSchema()); diff --git a/src/main/java/net/coru/kloadgen/serializer/GenericAvroRecordSerializer.java b/src/main/java/net/coru/kloadgen/serializer/GenericAvroRecordSerializer.java index 1ca5c647..5c2549e9 100644 --- a/src/main/java/net/coru/kloadgen/serializer/GenericAvroRecordSerializer.java +++ b/src/main/java/net/coru/kloadgen/serializer/GenericAvroRecordSerializer.java @@ -21,6 +21,10 @@ @Slf4j public class GenericAvroRecordSerializer implements Serializer { + public GenericAvroRecordSerializer() { + AvroSerializersUtil.setupLogicalTypesConversion(); + } + @Override public final byte[] serialize(final String topic, final T data) { final DatumWriter writer = new GenericDatumWriter<>(data.getSchema()); diff --git a/src/test/java/net/coru/kloadgen/serializer/AvroSerializersTest.java b/src/test/java/net/coru/kloadgen/serializer/AvroSerializersTest.java index de593f23..939d8aef 100644 --- a/src/test/java/net/coru/kloadgen/serializer/AvroSerializersTest.java +++ b/src/test/java/net/coru/kloadgen/serializer/AvroSerializersTest.java @@ -14,13 +14,9 @@ import net.coru.kloadgen.model.FieldValueMapping; import net.coru.kloadgen.processor.SchemaProcessor; import net.coru.kloadgen.testutil.FileHelper; -import org.apache.avro.Conversions; -import org.apache.avro.data.TimeConversions; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.common.serialization.Serializer; import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -36,49 +32,22 @@ private static Stream getSerializers() { ); } - @BeforeEach - void setUp() { - final var genericData = GenericData.get(); - - genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); - genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); - genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); - genericData.addLogicalTypeConversion(new Conversions.UUIDConversion()); - } - @ParameterizedTest @MethodSource("getSerializers") void recordSerializersTestLogicalTypes(final Serializer serializer) throws Exception { final var schemaFile = new FileHelper().getFile("/avro-files/testLogicalTypes.avsc"); final var schemaStr = readSchema(schemaFile); final var fieldValueMappings = Arrays.asList( - FieldValueMapping.builder().fieldName("Date").fieldType("int_date").valueLength(0).fieldValueList("[]").required(true) - .isAncestorRequired(true).build(), - FieldValueMapping.builder().fieldName("TimeMillis").fieldType("int_time-millis").valueLength(0).fieldValueList("[]").required(true) - .isAncestorRequired(true).build(), - FieldValueMapping.builder().fieldName("TimeMicros").fieldType("long_time-micros").valueLength(0).fieldValueList("[]").required(true).isAncestorRequired(true) - .build(), - FieldValueMapping.builder().fieldName("TimestampMillis").fieldType("long_timestamp-millis").valueLength(0).fieldValueList("[]").required(true).isAncestorRequired(true) - .build(), - FieldValueMapping.builder().fieldName("TimestampMicros").fieldType("long_timestamp-micros").valueLength(0).fieldValueList("[]").required(true).isAncestorRequired(true) - .build(), - FieldValueMapping.builder().fieldName("LocalTimestampMillis").fieldType("long_local-timestamp-millis").valueLength(0).fieldValueList("[]").required(true) - .isAncestorRequired(true) - .build(), - FieldValueMapping.builder().fieldName("LocalTimestampMicros").fieldType("long_local-timestamp-micros").valueLength(0).fieldValueList("[]").required(true) - .isAncestorRequired(true) - .build(), - FieldValueMapping.builder().fieldName("UUID").fieldType("string_uuid").valueLength(0).fieldValueList("[]").required(true).isAncestorRequired(true) - .build(), - FieldValueMapping.builder().fieldName("Decimal").fieldType("bytes_decimal").valueLength(0).fieldValueList("[]").required(true).isAncestorRequired(true) - .build(), - FieldValueMapping.builder().fieldName("DecimalFixed").fieldType("fixed_decimal").valueLength(0).fieldValueList("[]").required(true).isAncestorRequired(true) - .build()); + createFieldValueMapping("Date", "int_date"), + createFieldValueMapping("TimeMillis", "int_time-millis"), + createFieldValueMapping("TimeMicros", "long_time-micros"), + createFieldValueMapping("TimestampMillis", "long_timestamp-millis"), + createFieldValueMapping("TimestampMicros", "long_timestamp-micros"), + createFieldValueMapping("LocalTimestampMillis", "long_local-timestamp-millis"), + createFieldValueMapping("LocalTimestampMicros", "long_local-timestamp-micros"), + createFieldValueMapping("UUID", "string_uuid"), + createFieldValueMapping("Decimal", "bytes_decimal"), + createFieldValueMapping("DecimalFixed", "fixed_decimal")); final var metadata = new SchemaMetadata(1, 1, schemaStr); final ParsedSchema parsedSchema = new SchemaExtractorImpl().schemaTypesList(schemaFile, "AVRO"); @@ -99,4 +68,9 @@ private static String readSchema(final File file) throws IOException { return contentBuilder.toString(); } + + private FieldValueMapping createFieldValueMapping(String name, String fieldType) { + return FieldValueMapping.builder().fieldName(name).fieldType(fieldType).valueLength(0).fieldValueList("[]").required(true) + .isAncestorRequired(true).build(); + } }