Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to deserialize a wrapper type using unions via kafka deserializer #5420

Open
lsegv opened this issue Oct 29, 2024 · 2 comments
Open
Assignees
Labels
Milestone

Comments

@lsegv
Copy link

lsegv commented Oct 29, 2024

I'm trying to simulate usecase of sending around messages in a wrapper type such that if needed devs can use multiple messages on a single topic.

Here is the type. The purpose here is to be able to pass around different messages, list or map of different types (thus avoid creating bunch of temporary wrappers and polluting namespace.

protocol prototype {
	// T* are our specific types in different files
	record T1 {
		int count;
	}

	record T2 {
		int count;
	}

	record T3 {
		int count;
	}

	record T4 {
		int count;
	}

	// this is the wrapper/envelope type, it lets us send around any message from anyone to anyone
	record Msg {
		// this is very efficient as it will only take 4 bytes to encode what exact type is in the payload
		union{
			// developer will have to add the exact type here
			T1,
			T2,
			T3,

			// this will also remove all the redundant list/map wrapper types
			array<union{T1,T2,T3}>,
			map<union{T1,T2,T3}>
		} data;
	}
}

The yaml looks like this.

version: '3.8'

services:
  postgres:
    image: postgres
    environment:
      POSTGRES_USER: apicurio-registry
      POSTGRES_PASSWORD: password
    command: ["postgres", "-c", "log_statement=all"] # verbose logging (comment out if not needed)

  kafka:
    image: quay.io/strimzi/kafka:0.39.0-kafka-3.6.1
    command: [
      "sh", "-c",
      "./bin/kafka-storage.sh format -t $$(./bin/kafka-storage.sh random-uuid) -c ./config/kraft/server.properties && ./bin/kafka-server-start.sh ./config/kraft/server.properties"
    ]
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"

  apicurio-registry:
    image: apicurio/apicurio-registry:3.0.0
    container_name: apicurio-registry
    ports:
      - "8080:8080"
    environment:
      QUARKUS_HTTP_HOST: 0.0.0.0
      QUARKUS_HTTP_PORT: 8080
      APICURIO_DATASOURCE_URL: 'jdbc:postgresql://postgres/apicurio-registry'
      APICURIO_DATASOURCE_USERNAME: apicurio-registry
      APICURIO_DATASOURCE_PASSWORD: password
      APICURIO_STORAGE_KIND: "sql"
      APICURIO_STORAGE_SQL_KIND: "postgresql"
    depends_on:
      - postgres

  apicurio-registry-ui:
    image: apicurio/apicurio-registry-ui:3.0.0
    container_name: apicurio-registry-ui
    ports:
      - "8888:8080"  # Expose Apicurio Registry UI on localhost:8888
    environment:
      API_URL: "http://apicurio-registry:8080/apis/registry/v2"  # Connect the UI to the registry
    depends_on:
      - apicurio-registry

here is the producer

import io.apicurio.registry.serde.avro.AvroKafkaSerializer
import io.apicurio.registry.serde.config.SerdeConfig
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.LoggerFactory
import java.util.*

fun main() {
    val props = Properties()

    // configure kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SHARED.KAFKA_SERVER)
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer")
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all")

    // configure serialization
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer::class.java.name)

    // configure schema registry
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, SHARED.REGISTRY_URL)
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, true)

    val producer = KafkaProducer<String, Msg>(props)

    var count = 0
    while (true) {
        println("PRESS ENTER TO PUBLISH A MESSAGE ON /${SHARED.TOPIC}")
        readlnOrNull()

        try {
            val key = SHARED.TOPIC
            val value = Msg.newBuilder()
                .setData(T1.newBuilder().setCount(count++).build())
                .build()
            val record = ProducerRecord<String, Msg>(key, value)

            println(">> key = $key, value = $value")
            producer.send(record)
            producer.flush()
            println("successfully published")
        } catch (throwable: Throwable) {
            LoggerFactory.getLogger("").error("exception while publishing to kafka", throwable)
        }
    }
}

and the consumer (using kotlin here but its pretty much identical to java version in examples), only difference is i had to add props.putIfAbsent("apicurio.registry.use-specific-avro-reader", true).

import io.apicurio.registry.serde.avro.AvroKafkaDeserializer
import io.apicurio.registry.serde.config.SerdeConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*

fun main() {
    val props = Properties()

    // configure kafka
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SHARED.KAFKA_SERVER)
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer")
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    // configure serialization
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer::class.java.name)
    props.putIfAbsent("apicurio.registry.use-specific-avro-reader", true)

    // configure service registry
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, SHARED.REGISTRY_URL)

    val consumer: KafkaConsumer<String, Msg> = KafkaConsumer<String, Msg>(props)
    consumer.subscribe(listOf(SHARED.TOPIC))

    while (true) {
        println("PRESS ENTER TO ATTEMPT POLL FOR MORE MESSAGES ON /${SHARED.TOPIC}")
        readlnOrNull()
        try {
            val records = consumer.poll(Duration.ofMillis(100))
            for (record in records) {
                val key = record.key()
                val value = record.value()
                println(">> key = $key, value = $value")

                when (value.data) {
                    is T1 -> println("i've got T1")
                    is T2 -> println("i've got T2")
                    is T3 -> println("i've got T3")
                    is T4 -> println("i've got T4")
                    else -> println("unknown type")
                }
            }
        } catch (throwable: Throwable) {
            LoggerFactory.getLogger("").error("exception while polling from kafka", throwable)
        }
    }
}

Running this and publishing some messages i get
org.apache.avro.SchemaParseException: Undefined name: "T1" when attempting to parse it from consumer.

If i simply replace MSG and directly write any of the specific non wrapper types like T1 and read it directly there will be no issue.
Is there something i'm missing? are unions problematic here? (both producer and consumer have exact same version of the schema...)

@lsegv
Copy link
Author

lsegv commented Oct 29, 2024

The problem is the deserializer used with kafka, if i directly do this in a single file writing to a bytebuffer and parsing back there wont be any issue. To reproduce this you dont really need any of the code in the original post just use a union wrapper type in any of the kafka examples and try to read the payload from consumer.

Any help is greatly appreciated. I couldnt find anything to try from docs, only "apicurio.registry.use-specific-avro-reader" which helped me use specific types instead of generic but unions don't work and i'm out of ideas.

@lsegv
Copy link
Author

lsegv commented Oct 29, 2024

to compile the schemas i used avro-tools-1.11.1.jar (maybe its too old but why would deserialization work fine without kafka, and fail when i let kafka be the transport)...

@carlesarnal carlesarnal added this to the 3.0.5 milestone Nov 14, 2024
@carlesarnal carlesarnal self-assigned this Nov 14, 2024
@carlesarnal carlesarnal added priority/normal type/bug Something isn't working labels Nov 14, 2024
@carlesarnal carlesarnal moved this to Backlog in Registry 3.0 Nov 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Backlog
Development

No branches or pull requests

2 participants