Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Registry serialized message causes SchemaRegistryException on AWS #304

Closed
horuktaras opened this issue Aug 30, 2024 · 1 comment
Closed
Labels
❓ Question Further information is requested

Comments

@horuktaras
Copy link

We're currently using AWS Glue Schema Registry for storing our schemas. However, due to the limitations of the current library, we were unable to connect directly to Glue. As a workaround, I hard-coded the schema directly into the code. This allowed me to successfully serialize the messages and send them to Kafka. Unfortunately, the next day, the DevOps team notified us of errors related to the serialization of these messages.

KafkaMessageDTO class where we are generating messages and transform them to AVRO

import RandomUtils from "../../_utils/RandomUtils.js";
import {SchemaRegistry, SCHEMA_TYPE_AVRO, SCHEMA_TYPE_STRING} from "k6/x/kafka"

export class KafkaMessageDTO{
    constructor() {
        //todo: configure SchemaRegistry to connect to AWS Glue when it will be possible
        this.schemaRegistry = new SchemaRegistry();
        this.geoDataAVROSchema =
            {
                "namespace": "com.namespace.example",
                "type": "record",
                "name": "Data",
                "fields": [
                    {
                        "name": "id",
                        "type": "string"
                    },
                    {
                        "name": "type",
                        "type": {
                            "type": "enum",
                            "name": "Type",
                            "symbols": [
                                "123",
                                "321"
                            ]
                        }
                    },
                    {
                        "name": "source",
                        "type": {
                            "type": "enum",
                            "name": "Source",
                            "symbols": [
                                "321",
                                "123"
                            ]
                        }
                    },
                  {...other fields}
                ]
            }
    }


    createGeoDataMessage(id) {
        const random = new RandomUtils();
        const eventTypes = ["321", "123"];
        const eventSource = "123"; // Fixed event source

        let generatedData = {
            id: id,
            type: eventTypes[random.getRandomInt(0, eventTypes.length - 1)],
            source: eventSource,
            {...other fields}
        };

        console.log("JSON message to send to Kafka: " + JSON.stringify(generatedData))

        try {
            let serializedMessage = [
                {
                    key: this.schemaRegistry.serialize({
                        data: vehicleId,
                        schemaType: SCHEMA_TYPE_STRING,
                    }),
                    value: this.schemaRegistry.serialize({
                        data: generatedData,
                        schema: {schema: JSON.stringify(this.geoDataAVROSchema)},
                        schemaType: SCHEMA_TYPE_AVRO,
                    }),
                }
            ];
            console.log("Serialized to AVRO message to send to Kafka: " + JSON.stringify(serializedMessage))
            return serializedMessage;

        } catch (error) {
            console.error("Serialization error:", error);
            throw error;
        }
    }
}

export default GeoDataMessage;

Test.js class where we are sending messages

const kafka = new KafkaClient("data.vtg.normalized.gps-event");
let msg = new GeoDataMessage().createGeoDataMessage(vehicleId);
kafka.produceMessage(msg)

KafkaClient class with connections

import {Connection, Reader, SASL_AWS_IAM, TLS_1_2, Writer} from "k6/x/kafka"

export class KafkaClient {

    constructor(topic) {
        const brokers = [
            "b-1.....c7.kafka.us-east-1.amazonaws.com:9098",
            "b-3.....c7.kafka.us-east-1.amazonaws.com:9098",
            "b-2.....c7.kafka.us-east-1.amazonaws.com:9098"
        ]
        const saslConfig = {
            algorithm: SASL_AWS_IAM
        }

        const tlsConfig = {
            enableTls: true,
            insecureSkipTlsVerify: true,
            minVersion: TLS_1_2,
        }

        const offset = 0
        const partition = 0
        const numPartitions = 1
        const replicationFactor = 1

        this.writer = new Writer({
            brokers: brokers,
            topic: topic,
            sasl: saslConfig,
            tls: tlsConfig
        })
        this.reader = new Reader({
            brokers: brokers,
            topic: topic,
            partition: partition,
            offset: offset,
            sasl: saslConfig,
            tls: tlsConfig
        })
        this.connection = new Connection({
            address: brokers[0],
            sasl: saslConfig,
            tls: tlsConfig,
        })
    }

    produceMessage(message) {

        try {
            this.writer.produce({messages: message});
        } catch (error) {
            console.error("Producing error:", error);
            throw error;
        }
    }
}

export default KafkaClient;

Errors we are recieving:

Unable to deserialize message using <aws_schema_registry.serde.KafkaDeserializer object at 0x7f5e0a383710>: b’\x00\x00\x00\x00\x00\x12937397190\x00\x00\x00(2024-08-21T15:50:42Z\x02(2024-08-21T15:50:42Z(2024-08-21T15:50:42Z\x02\x0e1165551\x10M7738371
20603\x9a\x99sB\x02\x00\x00\x00\x00\x02\x88\xf7
={\xd3\x99\xa0\xc8U\xc0b\x97\xf2k\xb5

_\xc0\x02\x02\x01\x02\x00\x02\x01\x02\x00\x02\x00'

or

aws_schema_registry.exception.SchemaRegistryException: no secondary deserializer provided to handle unrecognized data encoding

Do you have and idea WHY it is going on?

@mostafa
Copy link
Owner

mostafa commented Sep 11, 2024

@horuktaras I explained it in detail here.

@mostafa mostafa closed this as completed Sep 11, 2024
@mostafa mostafa added the ❓ Question Further information is requested label Sep 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓ Question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants