Skip to content

Commit

Permalink
WIP: export schema registry client and the serdes functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
mostafa committed Aug 7, 2022
1 parent 91cf77c commit 7b25137
Show file tree
Hide file tree
Showing 43 changed files with 1,567 additions and 2,038 deletions.
184 changes: 26 additions & 158 deletions avro.go
Original file line number Diff line number Diff line change
@@ -1,175 +1,43 @@
package kafka

import (
"github.com/linkedin/goavro/v2"
"github.com/riferrei/srclient"
)

const (
AvroSerializer string = "io.confluent.kafka.serializers.KafkaAvroSerializer"
AvroDeserializer string = "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)

// SerializeAvro serializes the given data to wire-formatted Avro binary format and returns it
// as a byte array. It uses the given version to retrieve the schema from Schema Registry, otherwise
// it uses the given schema to manually create the codec and encode the data. The configuration
// is used to configure the Schema Registry client. The element is used to define the subject.
// The data should be a string.
// nolint: funlen
func SerializeAvro(
configuration Configuration, topic string, data interface{},
element Element, schema string, version int,
) ([]byte, *Xk6KafkaError) {
var bytesData []byte
if stringData, ok := data.(string); ok {
bytesData = []byte(stringData)
} else {
return nil, NewXk6KafkaError(failedTypeCast, "Failed to cast to string", nil)
}

client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)

subject, subjectNameError := GetSubjectName(schema, topic, element, configuration.Producer.SubjectNameStrategy)
if subjectNameError != nil {
return nil, subjectNameError
}

var schemaInfo *srclient.Schema
schemaID := 0

var xk6KafkaError *Xk6KafkaError

if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro)
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version)
}

if xk6KafkaError != nil {
logger.WithField("error", xk6KafkaError).Warn(
"Failed to create or get schema, manually encoding the data")
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, NewXk6KafkaError(failedCreateAvroCodec,
"Failed to create codec for encoding Avro",
err)
}

avroEncodedData, _, err := codec.NativeFromTextual(bytesData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeToAvro,
"Failed to encode data into Avro",
err)
}
type AvroSerde struct {
Serdes
}

bytesData, err = codec.BinaryFromNative(nil, avroEncodedData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeAvroToBinary,
"Failed to encode Avro data into binary",
err)
}
func (*AvroSerde) Serialize(data interface{}, schema *Schema) ([]byte, error) {
jsonBytes, err := toJSONBytes(data)
if err != nil {
return nil, err
}

if schemaInfo != nil {
schemaID = schemaInfo.ID()

// Encode the data into Avro and then the wire format
avroEncodedData, _, err := schemaInfo.Codec().NativeFromTextual(bytesData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeToAvro,
"Failed to encode data into Avro",
err)
}

bytesData, err = schemaInfo.Codec().BinaryFromNative(nil, avroEncodedData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeAvroToBinary,
"Failed to encode Avro data into binary",
err)
}
encodedData, _, err := schema.Codec().NativeFromTextual(jsonBytes)
if err != nil {
err := NewXk6KafkaError(failedToEncode, "Failed to encode data", err)
return nil, err
}

return EncodeWireFormat(bytesData, schemaID), nil
}

// DeserializeAvro deserializes the given data from wire-formatted Avro binary format and returns it
// as a byte array. It uses the given version to retrieve the schema from Schema Registry, otherwise
// it uses the given schema to manually create the codec and decode the data. The configuration
// is used to configure the Schema Registry client. The element is used to define the subject.
// The data should be a byte array.
// nolint: funlen
func DeserializeAvro(
configuration Configuration, topic string, data []byte,
element Element, schema string, version int,
) (interface{}, *Xk6KafkaError) {
schemaID, bytesDecodedData, err := DecodeWireFormat(data)
bytesData, err := schema.Codec().BinaryFromNative(nil, encodedData)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeFromWireFormat,
"Failed to remove wire format from the binary data",
err := NewXk6KafkaError(failedToEncodeToBinary,
"Failed to encode data into binary",
err)
return nil, err
}

var schemaInfo *srclient.Schema
var xk6KafkaError *Xk6KafkaError
var getSchemaError error

client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
return bytesData, nil
}

subject, subjectNameError := GetSubjectName(schema, topic, element, configuration.Consumer.SubjectNameStrategy)
if subjectNameError != nil {
return nil, subjectNameError
func (*AvroSerde) Deserialize(data []byte, schema *Schema) (interface{}, error) {
decodedData, _, err := schema.Codec().NativeFromBinary(data)
if err != nil {
err := NewXk6KafkaError(
failedToDecodeFromBinary, "Failed to decode data", err)
return nil, err
}

// nolint: gocritic
if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro)
} else if configuration.Consumer.UseMagicPrefix {
// Schema is not provided and no valid version flag,
// so we use te schemaID in the magic prefix
schemaInfo, getSchemaError = client.GetSchema(schemaID)
if getSchemaError != nil {
xk6KafkaError = NewXk6KafkaError(failedCreateAvroCodec,
"Failed to get schema by magic prefix",
getSchemaError)
}
if data, ok := decodedData.(map[string]interface{}); ok {
return data, nil
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version)
return nil, ErrInvalidDataType
}

if xk6KafkaError != nil {
logger.WithField("error", xk6KafkaError).Warn(
"Failed to create or get schema, manually decoding the data")
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, NewXk6KafkaError(failedCreateAvroCodec,
"Failed to create codec for decoding Avro",
err)
}

avroDecodedData, _, err := codec.NativeFromBinary(bytesDecodedData)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeAvroFromBinary,
"Failed to decode data from Avro",
err)
}

return avroDecodedData, nil
}

if schemaInfo != nil {
// Decode the data from Avro
avroDecodedData, _, err := schemaInfo.Codec().NativeFromBinary(bytesDecodedData)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeAvroFromBinary,
"Failed to decode data from Avro",
err)
}
return avroDecodedData, nil
}

return bytesDecodedData, nil
}
35 changes: 13 additions & 22 deletions bytearray.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
package kafka

import (
"github.com/riferrei/srclient"
)
import "github.com/riferrei/srclient"

const (
ByteArray srclient.SchemaType = "BYTEARRAY"
type ByteArraySerde struct {
Serdes
}

ByteArraySerializer string = "org.apache.kafka.common.serialization.ByteArraySerializer"
ByteArrayDeserializer string = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
const (
Bytes srclient.SchemaType = "BYTES"
)

// SerializeByteArray serializes the given data into a byte array and returns it.
// If the data is not a byte array, an error is returned. The configuration, topic, element,
// schema and version are just used to conform with the interface.
func SerializeByteArray(
configuration Configuration, topic string, data interface{},
element Element, schema string, version int,
) ([]byte, *Xk6KafkaError) {
// Serialize serializes the given data into a byte array.
func (*ByteArraySerde) Serialize(data interface{}, schema *Schema) ([]byte, error) {
switch data := data.(type) {
case []byte:
return data, nil
case []interface{}:
arr := make([]byte, len(data))
for i, u := range data {
if u, ok := u.(float64); ok {
arr[i] = byte(u)
} else {
return nil, NewXk6KafkaError(failedTypeCast, "Failed to cast to float64", nil)
return nil, ErrFailedTypeCast
}
}
return arr, nil
Expand All @@ -34,12 +30,7 @@ func SerializeByteArray(
}
}

// DeserializeByteArray deserializes the given data from a byte array and returns it.
// It just returns the data as is. The configuration, topic, element, schema and version
// are just used to conform with the interface.
func DeserializeByteArray(
configuration Configuration, topic string, data []byte,
element Element, schema string, version int,
) (interface{}, *Xk6KafkaError) {
// DeserializeByteArray returns the data as-is, because it is already a byte array.
func (*ByteArraySerde) Deserialize(data []byte, schema *Schema) (interface{}, error) {
return data, nil
}
37 changes: 0 additions & 37 deletions bytearray_test.go

This file was deleted.

45 changes: 0 additions & 45 deletions configuration.go

This file was deleted.

Loading

0 comments on commit 7b25137

Please sign in to comment.