Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#313 solved serializers casting problems using SpecificDatumWriter #315

Merged
merged 14 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions config/checkstyle/OSS_checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
"https://checkstyle.org/dtds/suppressions_1_2_xpath_experimental.dtd">

<suppressions>
<suppress checks="VisibilityModifier" files="[/\\].*model.*\.java"/>
<suppress checks="VisibilityModifier" files="[/\\].*EnrichedRecord\.java"/>
<suppress checks="DesignForExtension" files="[/\\].*model.*\.java"/>
<suppress checks="AbstractClassName" files="[/\\].*Field\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Sampler\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*PropertyEditor\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*ProcessorTest\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Standalone\.java"/>
<suppress checks="ExecutableStatementCount" files="[/\\].*Standalone\.java"/>
<suppress checks="AnonInnerLength" files="[/\\].*AutoCompletion\.java"/>
<suppress checks="Indentation" files="[/\\].*ExtractorTest\.java"/>
<suppress checks="CyclomaticComplexity" files="[/\\].*\.java"/>
<suppress checks="VisibilityModifier" files="[/\\].*model.*\.java" />
<suppress checks="VisibilityModifier" files="[/\\].*EnrichedRecord\.java" />
<suppress checks="DesignForExtension" files="[/\\].*model.*\.java" />
<suppress checks="DesignForExtension" files="[/\\].*AvroSerializersTest\.java" />
<suppress checks="AbstractClassName" files="[/\\].*Field\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Sampler\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*PropertyEditor\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*ProcessorTest\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Standalone\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*AvroSerializersUtil\.java" />
<suppress checks="ExecutableStatementCount" files="[/\\].*Standalone\.java" />
<suppress checks="AnonInnerLength" files="[/\\].*AutoCompletion\.java" />
<suppress checks="Indentation" files="[/\\].*ExtractorTest\.java" />
<suppress checks="CyclomaticComplexity" files="[/\\].*\.java" />
</suppressions>
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.1.3</version>
<version>5.1.4</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial data base on Data specification</description>
Expand Down Expand Up @@ -167,6 +167,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>davidgarciago</id>
<name>David García Gondell</name>
<email>[email protected]</email>
<organization>Corunet</organization>
<organizationUrl>https://corunet.github.io/</organizationUrl>
<roles>
<role>Junior Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
8 changes: 8 additions & 0 deletions schema_registry_docker/conf/kafka/kafka-jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};

Client {};
53 changes: 53 additions & 0 deletions schema_registry_docker/docker-compose-kafka-auth.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SASL_ENABLED: "false"
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 29092:29092
- 9092:9092
volumes:
- ./conf/kafka/kafka-jaas.conf:/etc/kafka/kafka_server_jaas.conf
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:9092, SASL_PLAINTEXT://:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,SASL_PLAINTEXT://localhost:29092
ZOOKEEPER_SASL_ENABLED: "false"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

schema-registry:
image: confluentinc/cp-schema-registry
depends_on:
- zookeeper
- kafka
ports:
- 8081:8081
volumes:
- ./conf:/conf:ro
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: Testers
SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/conf/schema-registry/schema-registry.jaas
kafka-manager:
image: kafkamanager/kafka-manager
environment:
ZK_HOSTS: zookeeper
ports:
- 9000:9000
18 changes: 0 additions & 18 deletions src/main/java/net/coru/kloadgen/sampler/KafkaConsumerSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.coru.kloadgen.util.ProducerKeysHelper;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
Expand All @@ -45,7 +42,6 @@ public final void setupTest(final JavaSamplerContext context) {
final var props = properties(context);
final var topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG);
consumer = new KafkaConsumer<>(props);
configGenericData();

consumer.subscribe(Collections.singletonList(topic));
}
Expand All @@ -60,20 +56,6 @@ public final Properties properties(final JavaSamplerContext context) {
return props;
}

private void configGenericData() {
final var genericData = GenericData.get();

genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
genericData.addLogicalTypeConversion(new Conversions.UUIDConversion());
}

@Override
public final void teardownTest(final JavaSamplerContext context) {
if (Objects.nonNull(consumer)) {
Expand Down
32 changes: 12 additions & 20 deletions src/main/java/net/coru/kloadgen/sampler/KafkaProducerSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import net.coru.kloadgen.serializer.ProtobufSerializer;
import net.coru.kloadgen.util.ProducerKeysHelper;
import net.coru.kloadgen.util.PropsKeysHelper;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
Expand All @@ -44,16 +41,27 @@
public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable {

private static final String TEMPLATE = "Topic: %s, partition: %s, offset: %s";

private static final Set<String> SERIALIZER_SET = Set.of(AvroSerializer.class.getName(), ProtobufSerializer.class.getName());

private static final long serialVersionUID = 1L;

private final transient StatelessGeneratorTool statelessGeneratorTool = new StatelessGeneratorTool();

private transient KafkaProducer<Object, Object> producer;

private String topic;

private String msgKeyType;

private List<String> msgKeyValue;

private boolean keyMessageFlag = false;

private transient BaseLoadGenerator generator;

private transient BaseLoadGenerator keyGenerator;

private transient Properties props;

@Override
Expand All @@ -62,8 +70,6 @@ public void setupTest(final JavaSamplerContext context) {

generator = SamplerUtil.configureValueGenerator(props);

configGenericData();

if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
|| "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
Expand All @@ -72,7 +78,7 @@ public void setupTest(final JavaSamplerContext context) {
} else {
msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
}
} else {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
Expand All @@ -94,20 +100,6 @@ private Properties properties(final JavaSamplerContext context) {
return commonProps;
}

private void configGenericData() {
final var genericData = GenericData.get();

genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
genericData.addLogicalTypeConversion(new Conversions.UUIDConversion());
}

@Override
public void teardownTest(final JavaSamplerContext context) {
if (Objects.nonNull(producer)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.nio.ByteBuffer;

import javax.xml.bind.DatatypeConverter;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -27,6 +26,10 @@ public class AvroSerializer<T extends EnrichedRecord> implements Serializer<T> {

private static final int ID_SIZE = 4;

public AvroSerializer() {
AvroSerializersUtil.setupLogicalTypesConversion();
}

@Override
public final byte[] serialize(final String topic, final T data) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package net.coru.kloadgen.serializer;

import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;

public final class AvroSerializersUtil {

private AvroSerializersUtil() {
}

public static void setupLogicalTypesConversion() {
final var genericData = GenericData.get();

genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
genericData.addLogicalTypeConversion(new Conversions.UUIDConversion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,24 @@
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

@Slf4j
public class GenericAvroRecordBinarySerializer<T extends GenericRecord> implements Serializer<T> {

public GenericAvroRecordBinarySerializer() {
AvroSerializersUtil.setupLogicalTypesConversion();
}

@Override
public final byte[] serialize(final String s, final T data) {
final var writer = new SpecificDatumWriter<>(data.getSchema());
final DatumWriter<T> writer = new GenericDatumWriter<>(data.getSchema());

var result = new byte[]{};
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final var encoder = EncoderFactory.get().binaryEncoder(baos, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,25 @@
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

@Slf4j
public class GenericAvroRecordSerializer<T extends GenericRecord> implements Serializer<T> {

public GenericAvroRecordSerializer() {
AvroSerializersUtil.setupLogicalTypesConversion();
}

@Override
public final byte[] serialize(final String topic, final T data) {
final DatumWriter<T> writer = new GenericDatumWriter<>(data.getSchema());

final var writer = new SpecificDatumWriter<>(data.getSchema());
byte[] dataBytes = new byte[0];
final var stream = new ByteArrayOutputStream();
final Encoder jsonEncoder;
Expand Down
Loading