Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#313 solved serializers casting problems using SpecificDatumWriter #315

Merged
merged 14 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.1.3</version>
<version>5.1.5</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial data base on Data specification</description>
Expand Down Expand Up @@ -167,6 +167,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>davidgarciago</id>
<name>David García Gondell</name>
<email>[email protected]</email>
<organization>Corunet</organization>
<organizationUrl>https://corunet.github.io/</organizationUrl>
<roles>
<role>Junior Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
8 changes: 8 additions & 0 deletions schema_registry_docker/conf/kafka/kafka-jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};

Client {};
53 changes: 53 additions & 0 deletions schema_registry_docker/docker-compose-kafka-auth.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SASL_ENABLED: "false"
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 29092:29092
- 9092:9092
volumes:
- ./conf/kafka/kafka-jaas.conf:/etc/kafka/kafka_server_jaas.conf
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:9092, SASL_PLAINTEXT://:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,SASL_PLAINTEXT://localhost:29092
ZOOKEEPER_SASL_ENABLED: "false"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

schema-registry:
image: confluentinc/cp-schema-registry
depends_on:
- zookeeper
- kafka
ports:
- 8081:8081
volumes:
- ./conf:/conf:ro
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistry
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: Testers
SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/conf/schema-registry/schema-registry.jaas
kafka-manager:
image: kafkamanager/kafka-manager
environment:
ZK_HOSTS: zookeeper
ports:
- 9000:9000
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

Expand All @@ -21,7 +24,12 @@ public class GenericAvroRecordBinarySerializer<T extends GenericRecord> implemen

@Override
public final byte[] serialize(final String s, final T data) {
final var writer = new SpecificDatumWriter<>(data.getSchema());
final DatumWriter<T> writer;
if (data instanceof SpecificRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test

writer = new SpecificDatumWriter<>(data.getSchema());
} else {
writer = new GenericDatumWriter<>(data.getSchema());
}
var result = new byte[]{};
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final var encoder = EncoderFactory.get().binaryEncoder(baos, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

Expand All @@ -23,7 +26,13 @@ public class GenericAvroRecordSerializer<T extends GenericRecord> implements Ser
@Override
public final byte[] serialize(final String topic, final T data) {

final var writer = new SpecificDatumWriter<>(data.getSchema());
final DatumWriter<T> writer;
if (data instanceof SpecificRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test

writer = new SpecificDatumWriter<>(data.getSchema());
} else {
writer = new GenericDatumWriter<>(data.getSchema());
}

byte[] dataBytes = new byte[0];
final var stream = new ByteArrayOutputStream();
final Encoder jsonEncoder;
Expand Down