Skip to content

Commit

Permalink
Cherry pick bug fix for issue #3388 (#3389)
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Assuied <[email protected]>
Co-authored-by: Alessandro (Ale) Segala <[email protected]>
  • Loading branch information
passuied and ItalyPaleAle authored Apr 1, 2024
1 parent 41b53fe commit 5071cbb
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 17 deletions.
10 changes: 10 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ func getSchemaSubject(topic string) string {
}

func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
// Null Data is valid and a tombstone record. It shouldn't be serialized
if message.Value == nil {
return []byte("null"), nil
}

switch config.ValueSchemaType {
case Avro:
srClient, err := k.getSchemaRegistyClient()
Expand Down Expand Up @@ -342,6 +347,11 @@ func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error)
}

func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
// Null Data is valid and a tombstone record. It shouldn't be serialized
if data == nil {
return nil, nil
}

valueSchemaType, err := GetValueSchemaType(metadata)
if err != nil {
return nil, err
Expand Down
69 changes: 52 additions & 17 deletions common/component/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@ import (

func TestGetValueSchemaType(t *testing.T) {
t.Run("No Metadata, return None", func(t *testing.T) {
act, _ := GetValueSchemaType(nil)
act, err := GetValueSchemaType(nil)
require.Equal(t, None, act)
require.NoError(t, err)
})

t.Run("No valueSchemaType, return None", func(t *testing.T) {
act, _ := GetValueSchemaType(make(map[string]string))
act, err := GetValueSchemaType(make(map[string]string))
require.Equal(t, None, act)
require.NoError(t, err)
})

t.Run("valueSchemaType='AVRO', return AVRO", func(t *testing.T) {
act, _ := GetValueSchemaType(map[string]string{"valueSchemaType": "AVRO"})
act, err := GetValueSchemaType(map[string]string{"valueSchemaType": "AVRO"})
require.Equal(t, Avro, act)
require.NoError(t, err)
})

t.Run("valueSchemaType='None', return None", func(t *testing.T) {
act, _ := GetValueSchemaType(map[string]string{"valueSchemaType": "None"})
act, err := GetValueSchemaType(map[string]string{"valueSchemaType": "None"})
require.Equal(t, None, act)
require.NoError(t, err)
})

t.Run("valueSchemaType='XXX', return Error", func(t *testing.T) {
Expand Down Expand Up @@ -79,10 +83,22 @@ func TestDeserializeValue(t *testing.T) {
Value: recordValue,
Topic: "my-topic",
}
act, _ := k.DeserializeValue(&msg, handlerConfig)
act, err := k.DeserializeValue(&msg, handlerConfig)
var actMap map[string]any
json.Unmarshal(act, &actMap)
require.Equal(t, testValue1, actMap)
require.NoError(t, err)
})

t.Run("Data null, return as JSON null", func(t *testing.T) {
msg := sarama.ConsumerMessage{
Key: []byte("my_key"),
Value: nil,
Topic: "my-topic",
}
act, err := k.DeserializeValue(&msg, handlerConfig)
require.Equal(t, []byte("null"), act)
require.NoError(t, err)
})

t.Run("Invalid too short data, return error", func(t *testing.T) {
Expand Down Expand Up @@ -164,25 +180,28 @@ func TestSerializeValueCachingDisabled(t *testing.T) {
t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {
valJSON, _ := json.Marshal(testValue1)

act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{})

require.Equal(t, valJSON, act)
require.NoError(t, err)
})

t.Run("valueSchemaType set to None, leave value as is", func(t *testing.T) {
valJSON, _ := json.Marshal(testValue1)

act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "None"})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "None"})

require.Equal(t, valJSON, act)
require.NoError(t, err)
})

t.Run("valueSchemaType set to None, leave value as is", func(t *testing.T) {
valJSON, _ := json.Marshal(testValue1)

act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "NONE"})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "NONE"})

require.Equal(t, valJSON, act)
require.NoError(t, err)
})

t.Run("valueSchemaType invalid, return error", func(t *testing.T) {
Expand All @@ -195,8 +214,16 @@ func TestSerializeValueCachingDisabled(t *testing.T) {

t.Run("schema found, serialize value as Avro binary", func(t *testing.T) {
valJSON, _ := json.Marshal(testValue1)
act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)
})

t.Run("value published null, no error", func(t *testing.T) {
act, err := k.SerializeValue("my-topic", nil, map[string]string{"valueSchemaType": "Avro"})

require.Nil(t, act)
require.NoError(t, err)
})

t.Run("invalid data, return error", func(t *testing.T) {
Expand All @@ -220,14 +247,16 @@ func TestSerializeValueCachingEnabled(t *testing.T) {

t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {
valJSON, _ := json.Marshal(testValue1)
act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{})
require.Equal(t, valJSON, act)
require.NoError(t, err)
})

t.Run("schema found, serialize value as Avro binary", func(t *testing.T) {
valJSON, _ := json.Marshal(testValue1)
act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)
})
}

Expand All @@ -250,12 +279,14 @@ func TestLatestSchemaCaching(t *testing.T) {

valJSON, _ := json.Marshal(testValue1)

act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)

// Call a 2nd time within TTL and make sure it's not called again
act, _ = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
act, err = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)
})

t.Run("Caching enabled, when cache entry expires, call GetLatestSchema() again", func(t *testing.T) {
Expand All @@ -270,14 +301,16 @@ func TestLatestSchemaCaching(t *testing.T) {

valJSON, _ := json.Marshal(testValue1)

act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)

time.Sleep(2 * time.Second)

// Call a 2nd time within TTL and make sure it's not called again
act, _ = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
act, err = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)
})

t.Run("Caching disabled, call GetLatestSchema() twice", func(t *testing.T) {
Expand All @@ -292,13 +325,15 @@ func TestLatestSchemaCaching(t *testing.T) {

valJSON, _ := json.Marshal(testValue1)

act, _ := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
act, err := k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})

assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)

// Call a 2nd time within TTL and make sure it's not called again
act, _ = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})
act, err = k.SerializeValue("my-topic", valJSON, map[string]string{"valueSchemaType": "Avro"})

assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)
})
}

0 comments on commit 5071cbb

Please sign in to comment.