-
Notifications
You must be signed in to change notification settings - Fork 338
Encoding messages with Avro
Avro is an efficient binary encoding format that allows for schema evolution, making it well suited for data processing systems. Avro uses the notion of writer schemas and reader schemas to limit the metadata needed to be encoded into payloads. The writer schema is the schema used when encoding the data; the reader schema is the schema used when decoding it. These need not be identical, but they do need to be compatible. In fact, when decoding, the reader needs access to both the reader schema and the writer schema, in order to be able to create a compatibility mapping between the two.
When using Avro in conjunction with Kafka, it is common to use a Schema Registry to store writer schemas. When writing a message, the schema used must be copied to the registry and the data must be tagged with a unique schema id. When reading a message, the writer schema is fetched from the registry based on the schema id.
While this sounds complicated, it's mostly automated when using AvroTurf, which handles both encoding/decoding and schema registration.
require "avro_turf"
require "kafka"
# You need to pass the URL of your Schema Registry.
avro = AvroTurf::Messaging.new(registry_url: "http://my-registry:8081/")
# Encoding data has the side effect of registering the schema. This only
# happens the first time a schema is used with the instance of `AvroTurf`.
data = avro.encode({ "title" => "hello, world" }, schema_name: "greeting")
# Assuming you've set up `producer`:
producer.produce(data, topic: "greetings")
require "avro_turf"
require "kafka"
# You need to pass the URL of your Schema Registry.
avro = AvroTurf::Messaging.new(registry_url: "http://my-registry:8081/")
kafka.each_message(topic: "greetings") do |message|
# By passing in `schema_name:`, you guard against the producer changing
# the schema in an incompatible way. You can leave out the argument, in
# which case you'll just get whatever data the producer encoded out, with
# no schema compatibility guarantees.
greeting = avro.decode(message.value, schema_name: "greeting")
greeting #=> { "title" => "hello, world" }
end
Note that if you're only interested in a subset of the fields in the Avro data, you can modify your reader schema to only include those fields. This is an efficient way of selectively decoding only parts of the data.