diff --git a/events/kafka.go b/events/kafka.go index 12d17091..abbc03f3 100644 --- a/events/kafka.go +++ b/events/kafka.go @@ -2,6 +2,10 @@ package events +import ( + "encoding/json" +) + type KafkaEvent struct { EventSource string `json:"eventSource"` EventSourceARN string `json:"eventSourceArn"` @@ -10,12 +14,37 @@ type KafkaEvent struct { } type KafkaRecord struct { - Topic string `json:"topic"` - Partition int64 `json:"partition"` - Offset int64 `json:"offset"` - Timestamp MilliSecondsEpochTime `json:"timestamp"` - TimestampType string `json:"timestampType"` - Key string `json:"key,omitempty"` - Value string `json:"value,omitempty"` - Headers []map[string][]byte `json:"headers"` + Topic string `json:"topic"` + Partition int64 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp MilliSecondsEpochTime `json:"timestamp"` + TimestampType string `json:"timestampType"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` + Headers []map[string]JSONNumberBytes `json:"headers"` +} + +// JSONNumberBytes represents array of bytes in Headers field. +type JSONNumberBytes []byte + +// MarshalJSON converts byte array into array of signed integers. +func (b JSONNumberBytes) MarshalJSON() ([]byte, error) { + signedNumbers := make([]int8, len(b)) + for i, value := range b { + signedNumbers[i] = int8(value) + } + return json.Marshal(signedNumbers) +} + +// UnmarshalJSON converts a given json with potential negative values into byte array. +func (b *JSONNumberBytes) UnmarshalJSON(data []byte) error { + var signedNumbers []int8 + if err := json.Unmarshal(data, &signedNumbers); err != nil { + return err + } + *b = make(JSONNumberBytes, len(signedNumbers)) + for i, value := range signedNumbers { + (*b)[i] = byte(value) + } + return nil } diff --git a/events/kafka_test.go b/events/kafka_test.go index ca367654..f4ad6577 100644 --- a/events/kafka_test.go +++ b/events/kafka_test.go @@ -20,6 +20,14 @@ func TestKafkaEventMarshaling(t *testing.T) { t.Errorf("could not unmarshal event. details: %v", err) } + // expected values for header + var headerValues [5]byte + headerValues[0] = 118 + headerValues[1] = 220 // -36 + 256 + headerValues[2] = 0 + headerValues[3] = 127 + headerValues[4] = 128 // -128 + 256 + assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092") assert.Equal(t, inputEvent.EventSource, "aws:kafka") assert.Equal(t, inputEvent.EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4") @@ -33,8 +41,9 @@ func TestKafkaEventMarshaling(t *testing.T) { for _, header := range record.Headers { for key, value := range header { assert.Equal(t, key, "headerKey") - headerValue := string(value) - assert.Equal(t, headerValue, "headerValue") + for i, headerValue := range value { + assert.Equal(t, headerValue, headerValues[i]) + } } } } diff --git a/events/testdata/kafka-event.json b/events/testdata/kafka-event.json index d84b1c61..1cf887cd 100644 --- a/events/testdata/kafka-event.json +++ b/events/testdata/kafka-event.json @@ -14,7 +14,13 @@ "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", "headers": [ { - "headerKey": "aGVhZGVyVmFsdWU=" + "headerKey": [ + 118, + -36, + 0, + 127, + -128 + ] } ] }