Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify how Avro schema ID is serialized on Kafka #41

Closed
lbroudoux opened this issue Dec 18, 2020 · 16 comments
Closed

Specify how Avro schema ID is serialized on Kafka #41

lbroudoux opened this issue Dec 18, 2020 · 16 comments
Labels

Comments

@lbroudoux
Copy link
Collaborator

Reason/Context

AsyncAPI allows to reference Avro schema used for serializing / deserializing messages on a topic and parser has a sample on how to express that with a schema being already into a registry. Nice !

When it comes to serializing Avro data to a Kafka topic, you usually have 2 options :

  • The « old-fashioned one » that is about putting raw Avro binary representation of the message payload,
  • The « modern one » that is about putting the Schema ID + the Avro binary representation of the message payload (see Schema Registry: A quick introduction.

As you have guessed, using one or the other of these options has impact on the consumers ! But as of today, consumer has no mean to know which one of the serialization mode is used.

Moreover, it appears that different SerDes libraries (Confluent, Apicurio and IBM as examples) have different way of serializing the Schema ID. Confluent is using first byte of payload to encode the schema ID whereas Apicurio and IBM SerDes are also able to put the ID into headers.

After exposing this issue on the AsyncAPI Slack, there's also concerns on how to retrieve Schema Registry endpoint URL. As we already have Server information with specific bindings info into AsyncAPI, it can be considered as legit - in the case a Schema Registry is needed to deserialize message - to also allow specification of the Schema Registry URL associated to Server.

Description

I propose to add a new Kafka specific binding attribute like at the Channel level to specify how the Schema ID is encoded. We could call it schemaIDLocation with possible values being header or payload. If binding attribute is missing then it simply means that Schema info is not provided. Depending on location value you may switch to different SerDes implementation for consumer generation or configuration

I also propose to add a new Kafka specific binding attribute at the Server level to specify the Schema Registry URL. Simply name it schemaRegistryUrl.

@lbroudoux lbroudoux added the enhancement New feature or request label Dec 18, 2020
@github-actions
Copy link

Welcome to AsyncAPI. Thanks a lot for reporting your first issue.

Keep in mind there are also other channels you can use to interact with AsyncAPI community. For more details check out this issue.

@dalelane
Copy link
Collaborator

I had a go at illustrating what this would look like. It got a bit long, so instead of putting it in a super-long comment here, I've put it in a stand-alone gist at https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188

@nictownsend
Copy link

I believe - binary/json can be inferred from the message binding content type? application/json or application/octet-stream?

@dalelane
Copy link
Collaborator

dalelane commented Feb 22, 2021

@nictownsend That's a great idea - much simpler! I've updated the gist with that.

@dalelane
Copy link
Collaborator

@lbroudoux I've updated the gist to reflect your remaining comments in Slack. It's slightly different to what you suggested (I put useRegistry alongside the schema registry url in the server binding, because it felt a better fit to make it part of describing server info) but I think I've addressed the gaps you spotted

@nictownsend
Copy link

https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188#how-this-could-be-described-in-asyncapi-3 - I don't think schemaIdLocation: "payload" is enough to infer the sample code byte skipping, we need to know the schemaVendor too.

Also, I think I agree with @lbroudoux over useRegistry - it seems more explicit to say "this channel/operation uses a registry" vs "this server uses a registry", as there may be use cases where the channel contents don't need a schema?

dalelane added a commit to dalelane/bindings that referenced this issue Feb 23, 2021
Adding enough information about how schemas are being
used in Kafka messages to enable an application that
consumes messages serialized using schemas to be
implemented purely from an AsyncAPI spec.

Also introduces two new common, reusable definitions
that can be used as message traits, to make it easier
to quickly specify headers used to capture schema
details.

Contributes to: asyncapi#40
Contributes to: asyncapi#41

Signed-off-by: Dale Lane <[email protected]>
@lbroudoux
Copy link
Collaborator Author

Hi @dalelane and @nictownsend,
Thanks for pursuing discussion : I was quite a bit in a rush these last days.

I agree with @nictownsend with the useRegistry flag - for me it's also more explicit to have it at the channel/operation level than having to browse the servers for that.

Also, to me it seems that the schemaRegistryAvailable flag you introduced is serving a different purpose by telling consumers that "we've used a registry you will not be able to access" and "you'll have to skip the 5 first bytes".

To me useRegistry is much more useful to message producers telling that they'll have to pick up a correct SerDes library and integrate lately with a registry.

What do you think?

I'll also have a review on the PR #55

@github-actions
Copy link

This issue has been automatically marked as stale because it has not had recent activity 😴
It will be closed in 60 days if no further activity occurs. To unstale this issue, add a comment with detailed explanation.
Thank you for your contributions ❤️

@github-actions github-actions bot added the stale label Apr 27, 2021
@derberg derberg removed enhancement New feature or request stale labels Apr 27, 2021
@derberg
Copy link
Member

derberg commented Apr 27, 2021

Hi folks, this one is getting stale, do we plan to push it forward?

@dalelane
Copy link
Collaborator

hi @derberg

yeah, sorry - my to-do list has gotten a bit out of control recently so I've not had time to put into this one for a while

If someone wants to take this and run with it, I certainly won't complain. But if no-one does, I really do want to come back to this and finish it.

@lbroudoux
Copy link
Collaborator Author

Hi!

I think that all the intent of this issue is now embedded into #55 where conversation is going on... I have a review today and will share thoughts and questions on #55 - to not disseminate information.

@github-actions
Copy link

This issue has been automatically marked as stale because it has not had recent activity 😴
It will be closed in 60 days if no further activity occurs. To unstale this issue, add a comment with detailed explanation.
Thank you for your contributions ❤️

@derberg
Copy link
Member

derberg commented Aug 30, 2021

Hey folks, stale bot killed #55 and killed this issue too. I on my own can't really proceed with this as you are the ones using Avro in production and know the best direction we should take here. Feel free to reopen, or just comment whenever you want to continue with these

@dalelane
Copy link
Collaborator

Sorry about that - I've been a bit distracted the last couple months between new job, holiday, etc.

Let me pick this up again this week, and I'll re-open when I have something worth sharing.

@tanujazz3
Copy link

Can anyone please help why I am not able to run and execute the generator for the remote using
ag ./asyncapi.yaml @asyncapi/java-spring-template -o output -p user="username"-p password="password"

I have seen some examples where they are providing the schema registry url under Url-->Binding

Here is how my asyncapi.yaml looks like

asyncapi: 2.4.0
info:
title: Sample Service
version: 1.0.0
description: This service reads the schema from remote confluent schema registry
servers:
development:
url: "url:port"
description: Development server
protocol: kafka
protocolVersion: 1.0.0
bindings:
kafka:
schemaRegistryUrl: "schemaRegistryUrl"
schemaRegistryVendor: "confluent"
schemaRegistryAvailable: true
channels:
Sample-output-topic:
publish:
bindings:
kafka:
message:
name: TempratureReading
schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
contentType: 'application/octet-stream'
payload:
$ref: 'Schema url'
title: ConsumerRecord
Sample-output-topic::
subscribe:
bindings:
kafka: {}
message:
name: TempratureReading
schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
contentType: 'application/octet-stream'
payload:
$ref: 'Schema url'
title: Customer

@dalelane
Copy link
Collaborator

@tanujazz3 Hello!

Your question doesn't look like it is related to this closed issue - would you mind opening a new issue so this can be investigated, please?

I think https://github.com/asyncapi/java-spring-template/issues would be the best place to do that. And please can you include an example of the output you get when you run the command.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants