Skip to content

Commit

Permalink
#313 solved serializers casting problems using SpecificDatumWriter (#315
Browse files Browse the repository at this point in the history
)

* 303 docker compose for securing kafka first attempt

* 303 changed docker compose SSL to PLAINTEXT

* #303 kafka Auth Configuration

* #303 added user for producer to connect to kafka

* #313 solved serializers casting problems using SpecificDatumWritter

* #313 removed unused asignment to DatumWriter

* #313 updated pom.xml

* #313 datum writer changed to final

* #313 added new test for serializers with logicaltypes and removed unused specific datum writer in serializers

* #313 Solving checkstyle problems

* #313 moving avro logical types conversion config to avro serializers

* #313 checkstyle errors fix

* #313 changed incorrect pom version

Co-authored-by: Jose Enrique García Maciñeiras <[email protected]>
  • Loading branch information
davidgarciago and Jose Enrique García Maciñeiras authored Dec 19, 2022
1 parent 5aec03e commit cd006c0
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 56 deletions.
26 changes: 14 additions & 12 deletions config/checkstyle/OSS_checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
"https://checkstyle.org/dtds/suppressions_1_2_xpath_experimental.dtd">

<suppressions>
<suppress checks="VisibilityModifier" files="[/\\].*model.*\.java"/>
<suppress checks="VisibilityModifier" files="[/\\].*EnrichedRecord\.java"/>
<suppress checks="DesignForExtension" files="[/\\].*model.*\.java"/>
<suppress checks="AbstractClassName" files="[/\\].*Field\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Sampler\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*PropertyEditor\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*ProcessorTest\.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Standalone\.java"/>
<suppress checks="ExecutableStatementCount" files="[/\\].*Standalone\.java"/>
<suppress checks="AnonInnerLength" files="[/\\].*AutoCompletion\.java"/>
<suppress checks="Indentation" files="[/\\].*ExtractorTest\.java"/>
<suppress checks="CyclomaticComplexity" files="[/\\].*\.java"/>
<suppress checks="VisibilityModifier" files="[/\\].*model.*\.java" />
<suppress checks="VisibilityModifier" files="[/\\].*EnrichedRecord\.java" />
<suppress checks="DesignForExtension" files="[/\\].*model.*\.java" />
<suppress checks="DesignForExtension" files="[/\\].*AvroSerializersTest\.java" />
<suppress checks="AbstractClassName" files="[/\\].*Field\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Sampler\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*PropertyEditor\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*ProcessorTest\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*Standalone\.java" />
<suppress checks="ClassDataAbstractionCoupling" files="[/\\].*AvroSerializersUtil\.java" />
<suppress checks="ExecutableStatementCount" files="[/\\].*Standalone\.java" />
<suppress checks="AnonInnerLength" files="[/\\].*AutoCompletion\.java" />
<suppress checks="Indentation" files="[/\\].*ExtractorTest\.java" />
<suppress checks="CyclomaticComplexity" files="[/\\].*\.java" />
</suppressions>
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.1.3</version>
<version>5.1.4</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial data base on Data specification</description>
Expand Down Expand Up @@ -167,6 +167,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>davidgarciago</id>
<name>David García Gondell</name>
<email>[email protected]</email>
<organization>Corunet</organization>
<organizationUrl>https://corunet.github.io/</organizationUrl>
<roles>
<role>Junior Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
8 changes: 8 additions & 0 deletions schema_registry_docker/conf/kafka/kafka-jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};

Client {};
53 changes: 53 additions & 0 deletions schema_registry_docker/docker-compose-kafka-auth.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SASL_ENABLED: "false"
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 29092:29092
- 9092:9092
volumes:
- ./conf/kafka/kafka-jaas.conf:/etc/kafka/kafka_server_jaas.conf
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:9092, SASL_PLAINTEXT://:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,SASL_PLAINTEXT://localhost:29092
ZOOKEEPER_SASL_ENABLED: "false"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

schema-registry:
image: confluentinc/cp-schema-registry
depends_on:
- zookeeper
- kafka
ports:
- 8081:8081
volumes:
- ./conf:/conf:ro
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: Testers
SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/conf/schema-registry/schema-registry.jaas
kafka-manager:
image: kafkamanager/kafka-manager
environment:
ZK_HOSTS: zookeeper
ports:
- 9000:9000
18 changes: 0 additions & 18 deletions src/main/java/net/coru/kloadgen/sampler/KafkaConsumerSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.coru.kloadgen.util.ProducerKeysHelper;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
Expand All @@ -45,7 +42,6 @@ public final void setupTest(final JavaSamplerContext context) {
final var props = properties(context);
final var topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG);
consumer = new KafkaConsumer<>(props);
configGenericData();

consumer.subscribe(Collections.singletonList(topic));
}
Expand All @@ -60,20 +56,6 @@ public final Properties properties(final JavaSamplerContext context) {
return props;
}

private void configGenericData() {
final var genericData = GenericData.get();

genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
genericData.addLogicalTypeConversion(new Conversions.UUIDConversion());
}

@Override
public final void teardownTest(final JavaSamplerContext context) {
if (Objects.nonNull(consumer)) {
Expand Down
32 changes: 12 additions & 20 deletions src/main/java/net/coru/kloadgen/sampler/KafkaProducerSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import net.coru.kloadgen.serializer.ProtobufSerializer;
import net.coru.kloadgen.util.ProducerKeysHelper;
import net.coru.kloadgen.util.PropsKeysHelper;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
Expand All @@ -44,16 +41,27 @@
public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable {

private static final String TEMPLATE = "Topic: %s, partition: %s, offset: %s";

private static final Set<String> SERIALIZER_SET = Set.of(AvroSerializer.class.getName(), ProtobufSerializer.class.getName());

private static final long serialVersionUID = 1L;

private final transient StatelessGeneratorTool statelessGeneratorTool = new StatelessGeneratorTool();

private transient KafkaProducer<Object, Object> producer;

private String topic;

private String msgKeyType;

private List<String> msgKeyValue;

private boolean keyMessageFlag = false;

private transient BaseLoadGenerator generator;

private transient BaseLoadGenerator keyGenerator;

private transient Properties props;

@Override
Expand All @@ -62,8 +70,6 @@ public void setupTest(final JavaSamplerContext context) {

generator = SamplerUtil.configureValueGenerator(props);

configGenericData();

if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
|| "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
Expand All @@ -72,7 +78,7 @@ public void setupTest(final JavaSamplerContext context) {
} else {
msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
}
} else {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
Expand All @@ -94,20 +100,6 @@ private Properties properties(final JavaSamplerContext context) {
return commonProps;
}

private void configGenericData() {
final var genericData = GenericData.get();

genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
genericData.addLogicalTypeConversion(new Conversions.UUIDConversion());
}

@Override
public void teardownTest(final JavaSamplerContext context) {
if (Objects.nonNull(producer)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.nio.ByteBuffer;

import javax.xml.bind.DatatypeConverter;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -27,6 +26,10 @@ public class AvroSerializer<T extends EnrichedRecord> implements Serializer<T> {

private static final int ID_SIZE = 4;

public AvroSerializer() {
AvroSerializersUtil.setupLogicalTypesConversion();
}

@Override
public final byte[] serialize(final String topic, final T data) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package net.coru.kloadgen.serializer;

import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;

public final class AvroSerializersUtil {

private AvroSerializersUtil() {
}

public static void setupLogicalTypesConversion() {
final var genericData = GenericData.get();

genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
genericData.addLogicalTypeConversion(new Conversions.UUIDConversion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,24 @@
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

@Slf4j
public class GenericAvroRecordBinarySerializer<T extends GenericRecord> implements Serializer<T> {

public GenericAvroRecordBinarySerializer() {
AvroSerializersUtil.setupLogicalTypesConversion();
}

@Override
public final byte[] serialize(final String s, final T data) {
final var writer = new SpecificDatumWriter<>(data.getSchema());
final DatumWriter<T> writer = new GenericDatumWriter<>(data.getSchema());

var result = new byte[]{};
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final var encoder = EncoderFactory.get().binaryEncoder(baos, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,25 @@
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

@Slf4j
public class GenericAvroRecordSerializer<T extends GenericRecord> implements Serializer<T> {

public GenericAvroRecordSerializer() {
AvroSerializersUtil.setupLogicalTypesConversion();
}

@Override
public final byte[] serialize(final String topic, final T data) {
final DatumWriter<T> writer = new GenericDatumWriter<>(data.getSchema());

final var writer = new SpecificDatumWriter<>(data.getSchema());
byte[] dataBytes = new byte[0];
final var stream = new ByteArrayOutputStream();
final Encoder jsonEncoder;
Expand Down
Loading

0 comments on commit cd006c0

Please sign in to comment.