Skip to content

Commit

Permalink
Add handling for time.Time in avro encode processor (#1650)
Browse files Browse the repository at this point in the history
* Add handling for time.Time in avro schema extractor

This change is only applicable for builtin connectors, since complex types such as `time.Time` are not
allowed in standalone connector due to restrictions in the plugin framework.

* change

* avro will truncate to microsecond; fix test accordingly

---------

Co-authored-by: Lyubo Kamenov <[email protected]>
  • Loading branch information
samirketema and lyuboxa authored Jun 12, 2024
1 parent 73478fd commit 4d286f4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/hamba/avro/v2"
)

var (
structuredDataType = reflect.TypeOf(opencdc.StructuredData{})
byteType = reflect.TypeOf(byte(0))
structuredDataType = reflect.TypeFor[opencdc.StructuredData]()
byteType = reflect.TypeFor[byte]()
timeType = reflect.TypeFor[time.Time]()
)

// extractor exposes a way to extract an Avro schema from a Go value.
Expand Down Expand Up @@ -89,6 +91,13 @@ func (e extractor) extract(path []string, v reflect.Value, t reflect.Type) (avro
case reflect.Map:
return e.extractMap(path, v, t)
case reflect.Struct:
switch t {
case timeType:
return avro.NewPrimitiveSchema(
avro.Long,
avro.NewPrimitiveLogicalSchema(avro.TimestampMicros),
), nil
}
return e.extractStruct(path, v, t)
}
// Invalid, Uintptr, UnsafePointer, Uint64, Uint, Complex64, Complex128, Chan, Func
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package avro
import (
"fmt"
"testing"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand All @@ -25,6 +26,8 @@ import (
)

func TestSchema_MarshalUnmarshal(t *testing.T) {
now := time.Now().UTC()

testCases := []struct {
name string
// haveValue is the value we use to extract the schema and which gets marshaled
Expand Down Expand Up @@ -498,11 +501,13 @@ func TestSchema_MarshalUnmarshal(t *testing.T) {
"foo": "bar",
"bar": 1,
"baz": []int{1, 2, 3},
"tz": now,
},
wantValue: map[string]any{ // structured data is unmarshaled into a map
"foo": "bar",
"bar": 1,
"baz": []any{1, 2, 3},
"tz": now.Truncate(time.Microsecond), // Avro cannot does not support nanoseconds
},
wantSchema: must(avro.NewRecordSchema(
"record.foo",
Expand All @@ -511,6 +516,7 @@ func TestSchema_MarshalUnmarshal(t *testing.T) {
must(avro.NewField("foo", avro.NewPrimitiveSchema(avro.String, nil))),
must(avro.NewField("bar", avro.NewPrimitiveSchema(avro.Int, nil))),
must(avro.NewField("baz", avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)))),
must(avro.NewField("tz", avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimestampMicros)))),
},
)),
}}
Expand Down

0 comments on commit 4d286f4

Please sign in to comment.