diff --git a/go.mod b/go.mod index 0dd40a914..061b0052e 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,6 @@ require ( github.com/hashicorp/go-hclog v1.5.0 github.com/hashicorp/go-plugin v1.5.2 github.com/jackc/pgx/v5 v5.4.3 - github.com/jinzhu/copier v0.4.0 github.com/jpillora/backoff v1.0.0 github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a github.com/matryer/is v1.4.1 diff --git a/go.sum b/go.sum index 84c24d51e..30f143b75 100644 --- a/go.sum +++ b/go.sum @@ -435,8 +435,6 @@ github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0 github.com/jdx/go-netrc v1.0.0 h1:QbLMLyCZGj0NA8glAhxUpf1zDg6cxnWgMBbjq40W0gQ= github.com/jdx/go-netrc v1.0.0/go.mod h1:Gh9eFQJnoTNIRHXl2j5bJXA1u84hQWJWgGh569zF3v8= github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls= -github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8= -github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= diff --git a/pkg/processor/procjs/processor_test.go b/pkg/processor/procjs/processor_test.go index ee8b09852..0e5cdb86b 100644 --- a/pkg/processor/procjs/processor_test.go +++ b/pkg/processor/procjs/processor_test.go @@ -21,7 +21,6 @@ import ( "testing" "time" - sdk "github.com/conduitio/conduit-connector-sdk" "github.com/conduitio/conduit/pkg/foundation/cchan" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/processor" @@ -414,7 +413,7 @@ func TestJSProcessor_Inspect(t *testing.T) { Position: record.Position("test-pos"), Operation: record.OperationUpdate, Metadata: record.Metadata{"test": "true"}, - Key: sdk.RawData("test-key"), + Key: record.RawData{Raw: []byte("test-key")}, Payload: record.Change{}, } recOut, err := underTest.Process(ctx, recIn) diff --git a/pkg/record/record.go b/pkg/record/record.go index f75a14dae..019050ae3 100644 --- a/pkg/record/record.go +++ b/pkg/record/record.go @@ -17,6 +17,7 @@ package record import ( + "bytes" "encoding/base64" "encoding/json" "fmt" @@ -25,7 +26,6 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/record/schema" - "github.com/jinzhu/copier" ) const ( @@ -149,27 +149,39 @@ func (r Record) mapData(d Data) interface{} { } func (r Record) Clone() Record { - clone := Record{} - // todo copier uses reflection under the hood - // we should optimize it, because Clone() is on a hot path. - // https://github.com/ConduitIO/conduit/issues/885 - err := copier.CopyWithOption( - &clone, - &r, - copier.Option{DeepCopy: true, IgnoreEmpty: true}, + var ( + metadata map[string]string + key Data + payloadBefore Data + payloadAfter Data ) - if err != nil { - // At the moment, a clone error cannot happen because of the input data - // (i.e. the record itself). - // It can only happen if the clone destination is not addressable - // or if reflect.ValueOf(&r) is invalid. - // The first should never happen, because &Record{} is always addressable. - // The second, reflect.ValueOf(&r) is invalid if &r is nil, which in our case - // is also not possible. - // Hence, if the copier returns an error, it's a bug related to how we use copier - // which would cause all pipelines to fail anyway, so we panic here. - // This also makes the method signature simpler. - panic(cerrors.Errorf("record clone error: %w", err)) + + if r.Metadata != nil { + metadata = make(map[string]string, len(r.Metadata)) + for k, v := range r.Metadata { + metadata[k] = v + } + } + + if r.Key != nil { + key = r.Key.Clone() + } + if r.Payload.Before != nil { + payloadBefore = r.Payload.Before.Clone() + } + if r.Payload.After != nil { + payloadAfter = r.Payload.After.Clone() + } + + clone := Record{ + Position: bytes.Clone(r.Position), + Operation: r.Operation, + Metadata: metadata, + Key: key, + Payload: Change{ + Before: payloadBefore, + After: payloadAfter, + }, } return clone } @@ -204,6 +216,7 @@ func (p Position) String() string { // Data are RawData and StructuredData. type Data interface { Bytes() []byte + Clone() Data } // StructuredData contains data in form of a map with string keys and arbitrary @@ -220,6 +233,18 @@ func (d StructuredData) Bytes() []byte { return b } +func (d StructuredData) Clone() Data { + cloned := make(map[string]any, len(d)) + for k, v := range d { + if vmap, ok := v.(map[string]any); ok { + cloned[k] = StructuredData(vmap).Clone() + } else { + cloned[k] = v + } + } + return StructuredData(cloned) +} + // RawData contains unstructured data in form of a byte slice. type RawData struct { Raw []byte @@ -239,3 +264,10 @@ func (d *RawData) UnmarshalText() ([]byte, error) { func (d RawData) Bytes() []byte { return d.Raw } + +func (d RawData) Clone() Data { + return RawData{ + Raw: bytes.Clone(d.Raw), + Schema: d.Schema, // this field is currently unused, we don't care about cloning it atm + } +}