diff --git a/pkg/plugin/processor/builtin/impl/avro/schemaregistry/avro/extractor.go b/pkg/plugin/processor/builtin/impl/avro/schemaregistry/avro/extractor.go index e331a5570..fdfb3c513 100644 --- a/pkg/plugin/processor/builtin/impl/avro/schemaregistry/avro/extractor.go +++ b/pkg/plugin/processor/builtin/impl/avro/schemaregistry/avro/extractor.go @@ -18,6 +18,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -25,8 +26,9 @@ import ( ) var ( - structuredDataType = reflect.TypeOf(opencdc.StructuredData{}) - byteType = reflect.TypeOf(byte(0)) + structuredDataType = reflect.TypeFor[opencdc.StructuredData]() + byteType = reflect.TypeFor[byte]() + timeType = reflect.TypeFor[time.Time]() ) // extractor exposes a way to extract an Avro schema from a Go value. @@ -89,6 +91,13 @@ func (e extractor) extract(path []string, v reflect.Value, t reflect.Type) (avro case reflect.Map: return e.extractMap(path, v, t) case reflect.Struct: + switch t { + case timeType: + return avro.NewPrimitiveSchema( + avro.Long, + avro.NewPrimitiveLogicalSchema(avro.TimestampMicros), + ), nil + } return e.extractStruct(path, v, t) } // Invalid, Uintptr, UnsafePointer, Uint64, Uint, Complex64, Complex128, Chan, Func diff --git a/pkg/plugin/processor/builtin/impl/avro/schemaregistry/avro/schema_test.go b/pkg/plugin/processor/builtin/impl/avro/schemaregistry/avro/schema_test.go index a52a8f599..5281bfeed 100644 --- a/pkg/plugin/processor/builtin/impl/avro/schemaregistry/avro/schema_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/schemaregistry/avro/schema_test.go @@ -17,6 +17,7 @@ package avro import ( "fmt" "testing" + "time" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit/pkg/foundation/cerrors" @@ -25,6 +26,8 @@ import ( ) func TestSchema_MarshalUnmarshal(t *testing.T) { + now := time.Now().UTC() + testCases := []struct { name string // haveValue is the value we use to extract the schema and which gets marshaled @@ -498,11 +501,13 @@ func TestSchema_MarshalUnmarshal(t *testing.T) { "foo": "bar", "bar": 1, "baz": []int{1, 2, 3}, + "tz": now, }, wantValue: map[string]any{ // structured data is unmarshaled into a map "foo": "bar", "bar": 1, "baz": []any{1, 2, 3}, + "tz": now.Truncate(time.Microsecond), // Avro cannot does not support nanoseconds }, wantSchema: must(avro.NewRecordSchema( "record.foo", @@ -511,6 +516,7 @@ func TestSchema_MarshalUnmarshal(t *testing.T) { must(avro.NewField("foo", avro.NewPrimitiveSchema(avro.String, nil))), must(avro.NewField("bar", avro.NewPrimitiveSchema(avro.Int, nil))), must(avro.NewField("baz", avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)))), + must(avro.NewField("tz", avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimestampMicros)))), }, )), }}